blob: eab85ab897a90cf0254f0f3e16f531ae3ff403eb [file] [log] [blame]
Copybara854996b2021-09-07 19:36:02 +00001# Copyright 2016 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style
3# license that can be found in the LICENSE file or at
4# https://developers.google.com/open-source/licenses/bsd
5
6"""A set of functions that provide persistence for Monorail issue tracking.
7
8This module provides functions to get, update, create, and (in some
9cases) delete each type of business object. It provides a logical
10persistence layer on top of an SQL database.
11
12Business objects are described in tracker_pb2.py and tracker_bizobj.py.
13"""
14from __future__ import print_function
15from __future__ import division
16from __future__ import absolute_import
17
18import collections
19import json
20import logging
21import os
22import time
23import uuid
24
25from google.appengine.api import app_identity
26from google.appengine.api import images
27from third_party import cloudstorage
28
29import settings
30from features import filterrules_helpers
31from framework import authdata
32from framework import exceptions
33from framework import framework_bizobj
34from framework import framework_constants
35from framework import framework_helpers
36from framework import gcs_helpers
37from framework import permissions
38from framework import sql
39from infra_libs import ts_mon
40from proto import project_pb2
41from proto import tracker_pb2
42from services import caches
43from services import tracker_fulltext
44from tracker import tracker_bizobj
45from tracker import tracker_helpers
46
47# TODO(jojwang): monorail:4693, remove this after all 'stable-full'
48# gates have been renamed to 'stable'.
49FLT_EQUIVALENT_GATES = {'stable-full': 'stable',
50 'stable': 'stable-full'}
51
52ISSUE_TABLE_NAME = 'Issue'
53ISSUESUMMARY_TABLE_NAME = 'IssueSummary'
54ISSUE2LABEL_TABLE_NAME = 'Issue2Label'
55ISSUE2COMPONENT_TABLE_NAME = 'Issue2Component'
56ISSUE2CC_TABLE_NAME = 'Issue2Cc'
57ISSUE2NOTIFY_TABLE_NAME = 'Issue2Notify'
58ISSUE2FIELDVALUE_TABLE_NAME = 'Issue2FieldValue'
59COMMENT_TABLE_NAME = 'Comment'
60COMMENTCONTENT_TABLE_NAME = 'CommentContent'
61COMMENTIMPORTER_TABLE_NAME = 'CommentImporter'
62ATTACHMENT_TABLE_NAME = 'Attachment'
63ISSUERELATION_TABLE_NAME = 'IssueRelation'
64DANGLINGRELATION_TABLE_NAME = 'DanglingIssueRelation'
65ISSUEUPDATE_TABLE_NAME = 'IssueUpdate'
66ISSUEFORMERLOCATIONS_TABLE_NAME = 'IssueFormerLocations'
67REINDEXQUEUE_TABLE_NAME = 'ReindexQueue'
68LOCALIDCOUNTER_TABLE_NAME = 'LocalIDCounter'
69ISSUESNAPSHOT_TABLE_NAME = 'IssueSnapshot'
70ISSUESNAPSHOT2CC_TABLE_NAME = 'IssueSnapshot2Cc'
71ISSUESNAPSHOT2COMPONENT_TABLE_NAME = 'IssueSnapshot2Component'
72ISSUESNAPSHOT2LABEL_TABLE_NAME = 'IssueSnapshot2Label'
73ISSUEPHASEDEF_TABLE_NAME = 'IssuePhaseDef'
74ISSUE2APPROVALVALUE_TABLE_NAME = 'Issue2ApprovalValue'
75ISSUEAPPROVAL2APPROVER_TABLE_NAME = 'IssueApproval2Approver'
76ISSUEAPPROVAL2COMMENT_TABLE_NAME = 'IssueApproval2Comment'
77
78
79ISSUE_COLS = [
80 'id', 'project_id', 'local_id', 'status_id', 'owner_id', 'reporter_id',
81 'opened', 'closed', 'modified',
82 'owner_modified', 'status_modified', 'component_modified',
83 'derived_owner_id', 'derived_status_id',
84 'deleted', 'star_count', 'attachment_count', 'is_spam']
85ISSUESUMMARY_COLS = ['issue_id', 'summary']
86ISSUE2LABEL_COLS = ['issue_id', 'label_id', 'derived']
87ISSUE2COMPONENT_COLS = ['issue_id', 'component_id', 'derived']
88ISSUE2CC_COLS = ['issue_id', 'cc_id', 'derived']
89ISSUE2NOTIFY_COLS = ['issue_id', 'email']
90ISSUE2FIELDVALUE_COLS = [
91 'issue_id', 'field_id', 'int_value', 'str_value', 'user_id', 'date_value',
92 'url_value', 'derived', 'phase_id']
93# Explicitly specify column 'Comment.id' to allow joins on other tables that
94# have an 'id' column.
95COMMENT_COLS = [
96 'Comment.id', 'issue_id', 'created', 'Comment.project_id', 'commenter_id',
97 'deleted_by', 'Comment.is_spam', 'is_description',
98 'commentcontent_id'] # Note: commentcontent_id must be last.
99COMMENTCONTENT_COLS = [
100 'CommentContent.id', 'content', 'inbound_message']
101COMMENTIMPORTER_COLS = ['comment_id', 'importer_id']
102ABBR_COMMENT_COLS = ['Comment.id', 'commenter_id', 'deleted_by',
103 'is_description']
104ATTACHMENT_COLS = [
105 'id', 'issue_id', 'comment_id', 'filename', 'filesize', 'mimetype',
106 'deleted', 'gcs_object_id']
107ISSUERELATION_COLS = ['issue_id', 'dst_issue_id', 'kind', 'rank']
108ABBR_ISSUERELATION_COLS = ['dst_issue_id', 'rank']
109DANGLINGRELATION_COLS = [
110 'issue_id', 'dst_issue_project', 'dst_issue_local_id',
111 'ext_issue_identifier', 'kind']
112ISSUEUPDATE_COLS = [
113 'id', 'issue_id', 'comment_id', 'field', 'old_value', 'new_value',
114 'added_user_id', 'removed_user_id', 'custom_field_name']
115ISSUEFORMERLOCATIONS_COLS = ['issue_id', 'project_id', 'local_id']
116REINDEXQUEUE_COLS = ['issue_id', 'created']
117ISSUESNAPSHOT_COLS = ['id', 'issue_id', 'shard', 'project_id', 'local_id',
118 'reporter_id', 'owner_id', 'status_id', 'period_start', 'period_end',
119 'is_open']
120ISSUESNAPSHOT2CC_COLS = ['issuesnapshot_id', 'cc_id']
121ISSUESNAPSHOT2COMPONENT_COLS = ['issuesnapshot_id', 'component_id']
122ISSUESNAPSHOT2LABEL_COLS = ['issuesnapshot_id', 'label_id']
123ISSUEPHASEDEF_COLS = ['id', 'name', 'rank']
124ISSUE2APPROVALVALUE_COLS = ['approval_id', 'issue_id', 'phase_id',
125 'status', 'setter_id', 'set_on']
126ISSUEAPPROVAL2APPROVER_COLS = ['approval_id', 'approver_id', 'issue_id']
127ISSUEAPPROVAL2COMMENT_COLS = ['approval_id', 'comment_id']
128
129CHUNK_SIZE = 1000
130
131
132class IssueIDTwoLevelCache(caches.AbstractTwoLevelCache):
133 """Class to manage RAM and memcache for Issue IDs."""
134
135 def __init__(self, cache_manager, issue_service):
136 super(IssueIDTwoLevelCache, self).__init__(
137 cache_manager, 'issue_id', 'issue_id:', int,
138 max_size=settings.issue_cache_max_size)
139 self.issue_service = issue_service
140
141 def _MakeCache(self, cache_manager, kind, max_size=None):
142 """Override normal RamCache creation with ValueCentricRamCache."""
143 return caches.ValueCentricRamCache(cache_manager, kind, max_size=max_size)
144
145 def _DeserializeIssueIDs(self, project_local_issue_ids):
146 """Convert database rows into a dict {(project_id, local_id): issue_id}."""
147 return {(project_id, local_id): issue_id
148 for (project_id, local_id, issue_id) in project_local_issue_ids}
149
150 def FetchItems(self, cnxn, keys):
151 """On RAM and memcache miss, hit the database."""
152 local_ids_by_pid = collections.defaultdict(list)
153 for project_id, local_id in keys:
154 local_ids_by_pid[project_id].append(local_id)
155
156 where = [] # We OR per-project pairs of conditions together.
157 for project_id, local_ids_in_project in local_ids_by_pid.items():
158 term_str = ('(Issue.project_id = %%s AND Issue.local_id IN (%s))' %
159 sql.PlaceHolders(local_ids_in_project))
160 where.append((term_str, [project_id] + local_ids_in_project))
161
162 rows = self.issue_service.issue_tbl.Select(
163 cnxn, cols=['project_id', 'local_id', 'id'],
164 where=where, or_where_conds=True)
165 return self._DeserializeIssueIDs(rows)
166
167 def _KeyToStr(self, key):
168 """This cache uses pairs of ints as keys. Convert them to strings."""
169 return '%d,%d' % key
170
171 def _StrToKey(self, key_str):
172 """This cache uses pairs of ints as keys. Convert them from strings."""
173 project_id_str, local_id_str = key_str.split(',')
174 return int(project_id_str), int(local_id_str)
175
176
177class IssueTwoLevelCache(caches.AbstractTwoLevelCache):
178 """Class to manage RAM and memcache for Issue PBs."""
179
180 def __init__(
181 self, cache_manager, issue_service, project_service, config_service):
182 super(IssueTwoLevelCache, self).__init__(
183 cache_manager, 'issue', 'issue:', tracker_pb2.Issue,
184 max_size=settings.issue_cache_max_size)
185 self.issue_service = issue_service
186 self.project_service = project_service
187 self.config_service = config_service
188
189 def _UnpackIssue(self, cnxn, issue_row):
190 """Partially construct an issue object using info from a DB row."""
191 (issue_id, project_id, local_id, status_id, owner_id, reporter_id,
192 opened, closed, modified, owner_modified, status_modified,
193 component_modified, derived_owner_id, derived_status_id,
194 deleted, star_count, attachment_count, is_spam) = issue_row
195
196 issue = tracker_pb2.Issue()
197 project = self.project_service.GetProject(cnxn, project_id)
198 issue.project_name = project.project_name
199 issue.issue_id = issue_id
200 issue.project_id = project_id
201 issue.local_id = local_id
202 if status_id is not None:
203 status = self.config_service.LookupStatus(cnxn, project_id, status_id)
204 issue.status = status
205 issue.owner_id = owner_id or 0
206 issue.reporter_id = reporter_id or 0
207 issue.derived_owner_id = derived_owner_id or 0
208 if derived_status_id is not None:
209 derived_status = self.config_service.LookupStatus(
210 cnxn, project_id, derived_status_id)
211 issue.derived_status = derived_status
212 issue.deleted = bool(deleted)
213 if opened:
214 issue.opened_timestamp = opened
215 if closed:
216 issue.closed_timestamp = closed
217 if modified:
218 issue.modified_timestamp = modified
219 if owner_modified:
220 issue.owner_modified_timestamp = owner_modified
221 if status_modified:
222 issue.status_modified_timestamp = status_modified
223 if component_modified:
224 issue.component_modified_timestamp = component_modified
225 issue.star_count = star_count
226 issue.attachment_count = attachment_count
227 issue.is_spam = bool(is_spam)
228 return issue
229
230 def _UnpackFieldValue(self, fv_row):
231 """Construct a field value object from a DB row."""
232 (issue_id, field_id, int_value, str_value, user_id, date_value, url_value,
233 derived, phase_id) = fv_row
234 fv = tracker_bizobj.MakeFieldValue(
235 field_id, int_value, str_value, user_id, date_value, url_value,
236 bool(derived), phase_id=phase_id)
237 return fv, issue_id
238
239 def _UnpackApprovalValue(self, av_row):
240 """Contruct an ApprovalValue PB from a DB row."""
241 (approval_id, issue_id, phase_id, status, setter_id, set_on) = av_row
242 if status:
243 status_enum = tracker_pb2.ApprovalStatus(status.upper())
244 else:
245 status_enum = tracker_pb2.ApprovalStatus.NOT_SET
246 av = tracker_pb2.ApprovalValue(
247 approval_id=approval_id, setter_id=setter_id, set_on=set_on,
248 status=status_enum, phase_id=phase_id)
249 return av, issue_id
250
251 def _UnpackPhase(self, phase_row):
252 """Construct a Phase PB from a DB row."""
253 (phase_id, name, rank) = phase_row
254 phase = tracker_pb2.Phase(
255 phase_id=phase_id, name=name, rank=rank)
256 return phase
257
258 def _DeserializeIssues(
259 self, cnxn, issue_rows, summary_rows, label_rows, component_rows,
260 cc_rows, notify_rows, fieldvalue_rows, relation_rows,
261 dangling_relation_rows, phase_rows, approvalvalue_rows,
262 av_approver_rows):
263 """Convert the given DB rows into a dict of Issue PBs."""
264 results_dict = {}
265 for issue_row in issue_rows:
266 issue = self._UnpackIssue(cnxn, issue_row)
267 results_dict[issue.issue_id] = issue
268
269 for issue_id, summary in summary_rows:
270 results_dict[issue_id].summary = summary
271
272 # TODO(jrobbins): it would be nice to order labels by rank and name.
273 for issue_id, label_id, derived in label_rows:
274 issue = results_dict.get(issue_id)
275 if not issue:
276 logging.info('Got label for an unknown issue: %r %r',
277 label_rows, issue_rows)
278 continue
279 label = self.config_service.LookupLabel(cnxn, issue.project_id, label_id)
280 assert label, ('Label ID %r on IID %r not found in project %r' %
281 (label_id, issue_id, issue.project_id))
282 if derived:
283 results_dict[issue_id].derived_labels.append(label)
284 else:
285 results_dict[issue_id].labels.append(label)
286
287 for issue_id, component_id, derived in component_rows:
288 if derived:
289 results_dict[issue_id].derived_component_ids.append(component_id)
290 else:
291 results_dict[issue_id].component_ids.append(component_id)
292
293 for issue_id, user_id, derived in cc_rows:
294 if derived:
295 results_dict[issue_id].derived_cc_ids.append(user_id)
296 else:
297 results_dict[issue_id].cc_ids.append(user_id)
298
299 for issue_id, email in notify_rows:
300 results_dict[issue_id].derived_notify_addrs.append(email)
301
302 for fv_row in fieldvalue_rows:
303 fv, issue_id = self._UnpackFieldValue(fv_row)
304 results_dict[issue_id].field_values.append(fv)
305
306 phases_by_id = {}
307 for phase_row in phase_rows:
308 phase = self._UnpackPhase(phase_row)
309 phases_by_id[phase.phase_id] = phase
310
311 approvers_dict = collections.defaultdict(list)
312 for approver_row in av_approver_rows:
313 approval_id, approver_id, issue_id = approver_row
314 approvers_dict[approval_id, issue_id].append(approver_id)
315
316 for av_row in approvalvalue_rows:
317 av, issue_id = self._UnpackApprovalValue(av_row)
318 av.approver_ids = approvers_dict[av.approval_id, issue_id]
319 results_dict[issue_id].approval_values.append(av)
320 if av.phase_id:
321 phase = phases_by_id[av.phase_id]
322 issue_phases = results_dict[issue_id].phases
323 if phase not in issue_phases:
324 issue_phases.append(phase)
325 # Order issue phases
326 for issue in results_dict.values():
327 if issue.phases:
328 issue.phases.sort(key=lambda phase: phase.rank)
329
330 for issue_id, dst_issue_id, kind, rank in relation_rows:
331 src_issue = results_dict.get(issue_id)
332 dst_issue = results_dict.get(dst_issue_id)
333 assert src_issue or dst_issue, (
334 'Neither source issue %r nor dest issue %r was found' %
335 (issue_id, dst_issue_id))
336 if src_issue:
337 if kind == 'blockedon':
338 src_issue.blocked_on_iids.append(dst_issue_id)
339 src_issue.blocked_on_ranks.append(rank)
340 elif kind == 'mergedinto':
341 src_issue.merged_into = dst_issue_id
342 else:
343 logging.info('unknown relation kind %r', kind)
344 continue
345
346 if dst_issue:
347 if kind == 'blockedon':
348 dst_issue.blocking_iids.append(issue_id)
349
350 for row in dangling_relation_rows:
351 issue_id, dst_issue_proj, dst_issue_id, ext_id, kind = row
352 src_issue = results_dict.get(issue_id)
353 if kind == 'blockedon':
354 src_issue.dangling_blocked_on_refs.append(
355 tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj,
356 dst_issue_id, ext_id))
357 elif kind == 'blocking':
358 src_issue.dangling_blocking_refs.append(
359 tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, dst_issue_id,
360 ext_id))
361 elif kind == 'mergedinto':
362 src_issue.merged_into_external = ext_id
363 else:
364 logging.warn('unhandled danging relation kind %r', kind)
365 continue
366
367 return results_dict
368
369 # Note: sharding is used to here to allow us to load issues from the replicas
370 # without placing load on the primary DB. Writes are not sharded.
371 # pylint: disable=arguments-differ
372 def FetchItems(self, cnxn, issue_ids, shard_id=None):
373 """Retrieve and deserialize issues."""
374 issue_rows = self.issue_service.issue_tbl.Select(
375 cnxn, cols=ISSUE_COLS, id=issue_ids, shard_id=shard_id)
376
377 summary_rows = self.issue_service.issuesummary_tbl.Select(
378 cnxn, cols=ISSUESUMMARY_COLS, shard_id=shard_id, issue_id=issue_ids)
379 label_rows = self.issue_service.issue2label_tbl.Select(
380 cnxn, cols=ISSUE2LABEL_COLS, shard_id=shard_id, issue_id=issue_ids)
381 component_rows = self.issue_service.issue2component_tbl.Select(
382 cnxn, cols=ISSUE2COMPONENT_COLS, shard_id=shard_id, issue_id=issue_ids)
383 cc_rows = self.issue_service.issue2cc_tbl.Select(
384 cnxn, cols=ISSUE2CC_COLS, shard_id=shard_id, issue_id=issue_ids)
385 notify_rows = self.issue_service.issue2notify_tbl.Select(
386 cnxn, cols=ISSUE2NOTIFY_COLS, shard_id=shard_id, issue_id=issue_ids)
387 fieldvalue_rows = self.issue_service.issue2fieldvalue_tbl.Select(
388 cnxn, cols=ISSUE2FIELDVALUE_COLS, shard_id=shard_id,
389 issue_id=issue_ids)
390 approvalvalue_rows = self.issue_service.issue2approvalvalue_tbl.Select(
391 cnxn, cols=ISSUE2APPROVALVALUE_COLS, issue_id=issue_ids)
392 phase_ids = [av_row[2] for av_row in approvalvalue_rows]
393 phase_rows = []
394 if phase_ids:
395 phase_rows = self.issue_service.issuephasedef_tbl.Select(
396 cnxn, cols=ISSUEPHASEDEF_COLS, id=list(set(phase_ids)))
397 av_approver_rows = self.issue_service.issueapproval2approver_tbl.Select(
398 cnxn, cols=ISSUEAPPROVAL2APPROVER_COLS, issue_id=issue_ids)
399 if issue_ids:
400 ph = sql.PlaceHolders(issue_ids)
401 blocked_on_rows = self.issue_service.issuerelation_tbl.Select(
402 cnxn, cols=ISSUERELATION_COLS, issue_id=issue_ids, kind='blockedon',
403 order_by=[('issue_id', []), ('rank DESC', []), ('dst_issue_id', [])])
404 blocking_rows = self.issue_service.issuerelation_tbl.Select(
405 cnxn, cols=ISSUERELATION_COLS, dst_issue_id=issue_ids,
406 kind='blockedon', order_by=[('issue_id', []), ('dst_issue_id', [])])
407 unique_blocking = tuple(
408 row for row in blocking_rows if row not in blocked_on_rows)
409 merge_rows = self.issue_service.issuerelation_tbl.Select(
410 cnxn, cols=ISSUERELATION_COLS,
411 where=[('(issue_id IN (%s) OR dst_issue_id IN (%s))' % (ph, ph),
412 issue_ids + issue_ids),
413 ('kind != %s', ['blockedon'])])
414 relation_rows = blocked_on_rows + unique_blocking + merge_rows
415 dangling_relation_rows = self.issue_service.danglingrelation_tbl.Select(
416 cnxn, cols=DANGLINGRELATION_COLS, issue_id=issue_ids)
417 else:
418 relation_rows = []
419 dangling_relation_rows = []
420
421 issue_dict = self._DeserializeIssues(
422 cnxn, issue_rows, summary_rows, label_rows, component_rows, cc_rows,
423 notify_rows, fieldvalue_rows, relation_rows, dangling_relation_rows,
424 phase_rows, approvalvalue_rows, av_approver_rows)
425 logging.info('IssueTwoLevelCache.FetchItems returning: %r', issue_dict)
426 return issue_dict
427
428
429class CommentTwoLevelCache(caches.AbstractTwoLevelCache):
430 """Class to manage RAM and memcache for IssueComment PBs."""
431
432 def __init__(self, cache_manager, issue_svc):
433 super(CommentTwoLevelCache, self).__init__(
434 cache_manager, 'comment', 'comment:', tracker_pb2.IssueComment,
435 max_size=settings.comment_cache_max_size)
436 self.issue_svc = issue_svc
437
438 # pylint: disable=arguments-differ
439 def FetchItems(self, cnxn, keys, shard_id=None):
440 comment_rows = self.issue_svc.comment_tbl.Select(cnxn,
441 cols=COMMENT_COLS, id=keys, shard_id=shard_id)
442
443 if len(comment_rows) < len(keys):
444 self.issue_svc.replication_lag_retries.increment()
445 logging.info('issue3755: expected %d, but got %d rows from shard %d',
446 len(keys), len(comment_rows), shard_id)
447 shard_id = None # Will use Primary DB.
448 comment_rows = self.issue_svc.comment_tbl.Select(
449 cnxn, cols=COMMENT_COLS, id=keys, shard_id=None)
450 logging.info(
451 'Retry got %d comment rows from the primary DB', len(comment_rows))
452
453 cids = [row[0] for row in comment_rows]
454 commentcontent_ids = [row[-1] for row in comment_rows]
455 content_rows = self.issue_svc.commentcontent_tbl.Select(
456 cnxn, cols=COMMENTCONTENT_COLS, id=commentcontent_ids,
457 shard_id=shard_id)
458 approval_rows = self.issue_svc.issueapproval2comment_tbl.Select(
459 cnxn, cols=ISSUEAPPROVAL2COMMENT_COLS, comment_id=cids)
460 amendment_rows = self.issue_svc.issueupdate_tbl.Select(
461 cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids, shard_id=shard_id)
462 attachment_rows = self.issue_svc.attachment_tbl.Select(
463 cnxn, cols=ATTACHMENT_COLS, comment_id=cids, shard_id=shard_id)
464 importer_rows = self.issue_svc.commentimporter_tbl.Select(
465 cnxn, cols=COMMENTIMPORTER_COLS, comment_id=cids, shard_id=shard_id)
466
467 comments = self.issue_svc._DeserializeComments(
468 comment_rows, content_rows, amendment_rows, attachment_rows,
469 approval_rows, importer_rows)
470
471 comments_dict = {}
472 for comment in comments:
473 comments_dict[comment.id] = comment
474
475 return comments_dict
476
477
478class IssueService(object):
479 """The persistence layer for Monorail's issues, comments, and attachments."""
480 spam_labels = ts_mon.CounterMetric(
481 'monorail/issue_svc/spam_label',
482 'Issues created, broken down by spam label.',
483 [ts_mon.StringField('type')])
484 replication_lag_retries = ts_mon.CounterMetric(
485 'monorail/issue_svc/replication_lag_retries',
486 'Counts times that loading comments from a replica failed',
487 [])
488 issue_creations = ts_mon.CounterMetric(
489 'monorail/issue_svc/issue_creations',
490 'Counts times that issues were created',
491 [])
492 comment_creations = ts_mon.CounterMetric(
493 'monorail/issue_svc/comment_creations',
494 'Counts times that comments were created',
495 [])
496
497 def __init__(self, project_service, config_service, cache_manager,
498 chart_service):
499 """Initialize this object so that it is ready to use.
500
501 Args:
502 project_service: services object for project info.
503 config_service: services object for tracker configuration info.
504 cache_manager: local cache with distributed invalidation.
505 chart_service (ChartService): An instance of ChartService.
506 """
507 # Tables that represent issue data.
508 self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE_NAME)
509 self.issuesummary_tbl = sql.SQLTableManager(ISSUESUMMARY_TABLE_NAME)
510 self.issue2label_tbl = sql.SQLTableManager(ISSUE2LABEL_TABLE_NAME)
511 self.issue2component_tbl = sql.SQLTableManager(ISSUE2COMPONENT_TABLE_NAME)
512 self.issue2cc_tbl = sql.SQLTableManager(ISSUE2CC_TABLE_NAME)
513 self.issue2notify_tbl = sql.SQLTableManager(ISSUE2NOTIFY_TABLE_NAME)
514 self.issue2fieldvalue_tbl = sql.SQLTableManager(ISSUE2FIELDVALUE_TABLE_NAME)
515 self.issuerelation_tbl = sql.SQLTableManager(ISSUERELATION_TABLE_NAME)
516 self.danglingrelation_tbl = sql.SQLTableManager(DANGLINGRELATION_TABLE_NAME)
517 self.issueformerlocations_tbl = sql.SQLTableManager(
518 ISSUEFORMERLOCATIONS_TABLE_NAME)
519 self.issuesnapshot_tbl = sql.SQLTableManager(ISSUESNAPSHOT_TABLE_NAME)
520 self.issuesnapshot2cc_tbl = sql.SQLTableManager(
521 ISSUESNAPSHOT2CC_TABLE_NAME)
522 self.issuesnapshot2component_tbl = sql.SQLTableManager(
523 ISSUESNAPSHOT2COMPONENT_TABLE_NAME)
524 self.issuesnapshot2label_tbl = sql.SQLTableManager(
525 ISSUESNAPSHOT2LABEL_TABLE_NAME)
526 self.issuephasedef_tbl = sql.SQLTableManager(ISSUEPHASEDEF_TABLE_NAME)
527 self.issue2approvalvalue_tbl = sql.SQLTableManager(
528 ISSUE2APPROVALVALUE_TABLE_NAME)
529 self.issueapproval2approver_tbl = sql.SQLTableManager(
530 ISSUEAPPROVAL2APPROVER_TABLE_NAME)
531 self.issueapproval2comment_tbl = sql.SQLTableManager(
532 ISSUEAPPROVAL2COMMENT_TABLE_NAME)
533
534 # Tables that represent comments.
535 self.comment_tbl = sql.SQLTableManager(COMMENT_TABLE_NAME)
536 self.commentcontent_tbl = sql.SQLTableManager(COMMENTCONTENT_TABLE_NAME)
537 self.commentimporter_tbl = sql.SQLTableManager(COMMENTIMPORTER_TABLE_NAME)
538 self.issueupdate_tbl = sql.SQLTableManager(ISSUEUPDATE_TABLE_NAME)
539 self.attachment_tbl = sql.SQLTableManager(ATTACHMENT_TABLE_NAME)
540
541 # Tables for cron tasks.
542 self.reindexqueue_tbl = sql.SQLTableManager(REINDEXQUEUE_TABLE_NAME)
543
544 # Tables for generating sequences of local IDs.
545 self.localidcounter_tbl = sql.SQLTableManager(LOCALIDCOUNTER_TABLE_NAME)
546
547 # Like a dictionary {(project_id, local_id): issue_id}
548 # Use value centric cache here because we cannot store a tuple in the
549 # Invalidate table.
550 self.issue_id_2lc = IssueIDTwoLevelCache(cache_manager, self)
551 # Like a dictionary {issue_id: issue}
552 self.issue_2lc = IssueTwoLevelCache(
553 cache_manager, self, project_service, config_service)
554
555 # Like a dictionary {comment_id: comment)
556 self.comment_2lc = CommentTwoLevelCache(
557 cache_manager, self)
558
559 self._config_service = config_service
560 self.chart_service = chart_service
561
562 ### Issue ID lookups
563
564 def LookupIssueIDsFollowMoves(self, cnxn, project_local_id_pairs):
565 # type: (MonorailConnection, Sequence[Tuple(int, int)]) ->
566 # (Sequence[int], Sequence[Tuple(int, int)])
567 """Find the global issue IDs given the project ID and local ID of each.
568
569 If any (project_id, local_id) pairs refer to an issue that has been moved,
570 the issue ID will still be returned.
571
572 Args:
573 cnxn: Monorail connection.
574 project_local_id_pairs: (project_id, local_id) pairs to look up.
575
576 Returns:
577 A tuple of two items.
578 1. A sequence of global issue IDs in the `project_local_id_pairs` order.
579 2. A sequence of (project_id, local_id) containing each pair provided
580 for which no matching issue is found.
581 """
582
583 issue_id_dict, misses = self.issue_id_2lc.GetAll(
584 cnxn, project_local_id_pairs)
585 for miss in misses:
586 project_id, local_id = miss
587 issue_id = int(
588 self.issueformerlocations_tbl.SelectValue(
589 cnxn,
590 'issue_id',
591 default=0,
592 project_id=project_id,
593 local_id=local_id))
594 if issue_id:
595 misses.remove(miss)
596 issue_id_dict[miss] = issue_id
597 # Put the Issue IDs in the order specified by project_local_id_pairs
598 issue_ids = [
599 issue_id_dict[pair]
600 for pair in project_local_id_pairs
601 if pair in issue_id_dict
602 ]
603
604 return issue_ids, misses
605
606 def LookupIssueIDs(self, cnxn, project_local_id_pairs):
607 """Find the global issue IDs given the project ID and local ID of each."""
608 issue_id_dict, misses = self.issue_id_2lc.GetAll(
609 cnxn, project_local_id_pairs)
610
611 # Put the Issue IDs in the order specified by project_local_id_pairs
612 issue_ids = [issue_id_dict[pair] for pair in project_local_id_pairs
613 if pair in issue_id_dict]
614
615 return issue_ids, misses
616
617 def LookupIssueID(self, cnxn, project_id, local_id):
618 """Find the global issue ID given the project ID and local ID."""
619 issue_ids, _misses = self.LookupIssueIDs(cnxn, [(project_id, local_id)])
620 try:
621 return issue_ids[0]
622 except IndexError:
623 raise exceptions.NoSuchIssueException()
624
625 def ResolveIssueRefs(
626 self, cnxn, ref_projects, default_project_name, refs):
627 """Look up all the referenced issues and return their issue_ids.
628
629 Args:
630 cnxn: connection to SQL database.
631 ref_projects: pre-fetched dict {project_name: project} of all projects
632 mentioned in the refs as well as the default project.
633 default_project_name: string name of the current project, this is used
634 when the project_name in a ref is None.
635 refs: list of (project_name, local_id) pairs. These are parsed from
636 textual references in issue descriptions, comments, and the input
637 in the blocked-on field.
638
639 Returns:
640 A list of issue_ids for all the referenced issues. References to issues
641 in deleted projects and any issues not found are simply ignored.
642 """
643 if not refs:
644 return [], []
645
646 project_local_id_pairs = []
647 for project_name, local_id in refs:
648 project = ref_projects.get(project_name or default_project_name)
649 if not project or project.state == project_pb2.ProjectState.DELETABLE:
650 continue # ignore any refs to issues in deleted projects
651 project_local_id_pairs.append((project.project_id, local_id))
652
653 return self.LookupIssueIDs(cnxn, project_local_id_pairs) # tuple
654
655 def LookupIssueRefs(self, cnxn, issue_ids):
656 """Return {issue_id: (project_name, local_id)} for each issue_id."""
657 issue_dict, _misses = self.GetIssuesDict(cnxn, issue_ids)
658 return {
659 issue_id: (issue.project_name, issue.local_id)
660 for issue_id, issue in issue_dict.items()}
661
662 ### Issue objects
663
664 def CreateIssue(
665 self,
666 cnxn,
667 services,
668 issue,
669 marked_description,
670 attachments=None,
671 index_now=False,
672 importer_id=None):
673 """Create and store a new issue with all the given information.
674
675 Args:
676 cnxn: connection to SQL database.
677 services: persistence layer for users, issues, and projects.
678 issue: Issue PB to create.
679 marked_description: issue description with initial HTML markup.
680 attachments: [(filename, contents, mimetype),...] attachments uploaded at
681 the time the comment was made.
682 index_now: True if the issue should be updated in the full text index.
683 importer_id: optional user ID of API client importing issues for users.
684
685 Returns:
686 A tuple (the newly created Issue PB and Comment PB for the
687 issue description).
688 """
689 project_id = issue.project_id
690 reporter_id = issue.reporter_id
691 timestamp = issue.opened_timestamp
692 config = self._config_service.GetProjectConfig(cnxn, project_id)
693
694 iids_to_invalidate = set()
695 if len(issue.blocked_on_iids) != 0:
696 iids_to_invalidate.update(issue.blocked_on_iids)
697 if len(issue.blocking_iids) != 0:
698 iids_to_invalidate.update(issue.blocking_iids)
699
700 comment = self._MakeIssueComment(
701 project_id, reporter_id, marked_description,
702 attachments=attachments, timestamp=timestamp,
703 is_description=True, importer_id=importer_id)
704
705 reporter = services.user.GetUser(cnxn, reporter_id)
706 project = services.project.GetProject(cnxn, project_id)
707 reporter_auth = authdata.AuthData.FromUserID(cnxn, reporter_id, services)
708 is_project_member = framework_bizobj.UserIsInProject(
709 project, reporter_auth.effective_ids)
710 classification = services.spam.ClassifyIssue(
711 issue, comment, reporter, is_project_member)
712
713 if classification['confidence_is_spam'] > settings.classifier_spam_thresh:
714 issue.is_spam = True
715 predicted_label = 'spam'
716 else:
717 predicted_label = 'ham'
718
719 logging.info('classified new issue as %s' % predicted_label)
720 self.spam_labels.increment({'type': predicted_label})
721
722 # Create approval surveys
723 approval_comments = []
724 if len(issue.approval_values) != 0:
725 approval_defs_by_id = {ad.approval_id: ad for ad in config.approval_defs}
726 for av in issue.approval_values:
727 ad = approval_defs_by_id.get(av.approval_id)
728 if ad:
729 survey = ''
730 if ad.survey:
731 questions = ad.survey.split('\n')
732 survey = '\n'.join(['<b>' + q + '</b>' for q in questions])
733 approval_comments.append(self._MakeIssueComment(
734 project_id, reporter_id, survey, timestamp=timestamp,
735 is_description=True, approval_id=ad.approval_id))
736 else:
737 logging.info('Could not find ApprovalDef with approval_id %r',
738 av.approval_id)
739
740 issue.local_id = self.AllocateNextLocalID(cnxn, project_id)
741 self.issue_creations.increment()
742 issue_id = self.InsertIssue(cnxn, issue)
743 comment.issue_id = issue_id
744 self.InsertComment(cnxn, comment)
745 for approval_comment in approval_comments:
746 approval_comment.issue_id = issue_id
747 self.InsertComment(cnxn, approval_comment)
748
749 issue.issue_id = issue_id
750
751 # ClassifyIssue only returns confidence_is_spam, but
752 # RecordClassifierIssueVerdict records confidence of
753 # ham or spam. Therefore if ham, invert score.
754 confidence = classification['confidence_is_spam']
755 if not issue.is_spam:
756 confidence = 1.0 - confidence
757
758 services.spam.RecordClassifierIssueVerdict(
759 cnxn, issue, predicted_label=='spam',
760 confidence, classification['failed_open'])
761
762 if permissions.HasRestrictions(issue, 'view'):
763 self._config_service.InvalidateMemcache(
764 [issue], key_prefix='nonviewable:')
765
766 # Add a comment to existing issues saying they are now blocking or
767 # blocked on this issue.
768 blocked_add_issues = self.GetIssues(cnxn, issue.blocked_on_iids)
769 for add_issue in blocked_add_issues:
770 self.CreateIssueComment(
771 cnxn, add_issue, reporter_id, content='',
772 amendments=[tracker_bizobj.MakeBlockingAmendment(
773 [(issue.project_name, issue.local_id)], [],
774 default_project_name=add_issue.project_name)])
775 blocking_add_issues = self.GetIssues(cnxn, issue.blocking_iids)
776 for add_issue in blocking_add_issues:
777 self.CreateIssueComment(
778 cnxn, add_issue, reporter_id, content='',
779 amendments=[tracker_bizobj.MakeBlockedOnAmendment(
780 [(issue.project_name, issue.local_id)], [],
781 default_project_name=add_issue.project_name)])
782
783 self._UpdateIssuesModified(
784 cnxn, iids_to_invalidate, modified_timestamp=timestamp)
785
786 if index_now:
787 tracker_fulltext.IndexIssues(
788 cnxn, [issue], services.user, self, self._config_service)
789 else:
790 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
791
792 return issue, comment
793
794 def AllocateNewLocalIDs(self, cnxn, issues):
795 # Filter to just the issues that need new local IDs.
796 issues = [issue for issue in issues if issue.local_id < 0]
797
798 for issue in issues:
799 if issue.local_id < 0:
800 issue.local_id = self.AllocateNextLocalID(cnxn, issue.project_id)
801
802 self.UpdateIssues(cnxn, issues)
803
804 logging.info("AllocateNewLocalIDs")
805
806 def GetAllIssuesInProject(
807 self, cnxn, project_id, min_local_id=None, use_cache=True):
808 """Special query to efficiently get ALL issues in a project.
809
810 This is not done while the user is waiting, only by backround tasks.
811
812 Args:
813 cnxn: connection to SQL database.
814 project_id: the ID of the project.
815 min_local_id: optional int to start at.
816 use_cache: optional boolean to turn off using the cache.
817
818 Returns:
819 A list of Issue protocol buffers for all issues.
820 """
821 all_local_ids = self.GetAllLocalIDsInProject(
822 cnxn, project_id, min_local_id=min_local_id)
823 return self.GetIssuesByLocalIDs(
824 cnxn, project_id, all_local_ids, use_cache=use_cache)
825
826 def GetAnyOnHandIssue(self, issue_ids, start=None, end=None):
827 """Get any one issue from RAM or memcache, otherwise return None."""
828 return self.issue_2lc.GetAnyOnHandItem(issue_ids, start=start, end=end)
829
830 def GetIssuesDict(self, cnxn, issue_ids, use_cache=True, shard_id=None):
831 # type: (MonorailConnection, Collection[int], Optional[Boolean],
832 # Optional[int]) -> (Dict[int, Issue], Sequence[int])
833 """Get a dict {iid: issue} from the DB or cache.
834
835 Returns:
836 A dict {iid: issue} from the DB or cache.
837 A sequence of iid that could not be found.
838 """
839 issue_dict, missed_iids = self.issue_2lc.GetAll(
840 cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
841 if not use_cache:
842 for issue in issue_dict.values():
843 issue.assume_stale = False
844 return issue_dict, missed_iids
845
846 def GetIssues(self, cnxn, issue_ids, use_cache=True, shard_id=None):
847 # type: (MonorailConnection, Sequence[int], Optional[Boolean],
848 # Optional[int]) -> (Sequence[int])
849 """Get a list of Issue PBs from the DB or cache.
850
851 Args:
852 cnxn: connection to SQL database.
853 issue_ids: integer global issue IDs of the issues.
854 use_cache: optional boolean to turn off using the cache.
855 shard_id: optional int shard_id to limit retrieval.
856
857 Returns:
858 A list of Issue PBs in the same order as the given issue_ids.
859 """
860 issue_dict, _misses = self.GetIssuesDict(
861 cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
862
863 # Return a list that is ordered the same as the given issue_ids.
864 issue_list = [issue_dict[issue_id] for issue_id in issue_ids
865 if issue_id in issue_dict]
866
867 return issue_list
868
869 def GetIssue(self, cnxn, issue_id, use_cache=True):
870 """Get one Issue PB from the DB.
871
872 Args:
873 cnxn: connection to SQL database.
874 issue_id: integer global issue ID of the issue.
875 use_cache: optional boolean to turn off using the cache.
876
877 Returns:
878 The requested Issue protocol buffer.
879
880 Raises:
881 NoSuchIssueException: the issue was not found.
882 """
883 issues = self.GetIssues(cnxn, [issue_id], use_cache=use_cache)
884 try:
885 return issues[0]
886 except IndexError:
887 raise exceptions.NoSuchIssueException()
888
889 def GetIssuesByLocalIDs(
890 self, cnxn, project_id, local_id_list, use_cache=True, shard_id=None):
891 """Get all the requested issues.
892
893 Args:
894 cnxn: connection to SQL database.
895 project_id: int ID of the project to which the issues belong.
896 local_id_list: list of integer local IDs for the requested issues.
897 use_cache: optional boolean to turn off using the cache.
898 shard_id: optional int shard_id to choose a replica.
899
900 Returns:
901 List of Issue PBs for the requested issues. The result Issues
902 will be ordered in the same order as local_id_list.
903 """
904 issue_ids_to_fetch, _misses = self.LookupIssueIDs(
905 cnxn, [(project_id, local_id) for local_id in local_id_list])
906 issues = self.GetIssues(
907 cnxn, issue_ids_to_fetch, use_cache=use_cache, shard_id=shard_id)
908 return issues
909
910 def GetIssueByLocalID(self, cnxn, project_id, local_id, use_cache=True):
911 """Get one Issue PB from the DB.
912
913 Args:
914 cnxn: connection to SQL database.
915 project_id: the ID of the project to which the issue belongs.
916 local_id: integer local ID of the issue.
917 use_cache: optional boolean to turn off using the cache.
918
919 Returns:
920 The requested Issue protocol buffer.
921 """
922 issues = self.GetIssuesByLocalIDs(
923 cnxn, project_id, [local_id], use_cache=use_cache)
924 try:
925 return issues[0]
926 except IndexError:
927 raise exceptions.NoSuchIssueException(
928 'The issue %s:%d does not exist.' % (project_id, local_id))
929
930 def GetOpenAndClosedIssues(self, cnxn, issue_ids):
931 """Return the requested issues in separate open and closed lists.
932
933 Args:
934 cnxn: connection to SQL database.
935 issue_ids: list of int issue issue_ids.
936
937 Returns:
938 A pair of lists, the first with open issues, second with closed issues.
939 """
940 if not issue_ids:
941 return [], [] # make one common case efficient
942
943 issues = self.GetIssues(cnxn, issue_ids)
944 project_ids = {issue.project_id for issue in issues}
945 configs = self._config_service.GetProjectConfigs(cnxn, project_ids)
946 open_issues = []
947 closed_issues = []
948 for issue in issues:
949 config = configs[issue.project_id]
950 if tracker_helpers.MeansOpenInProject(
951 tracker_bizobj.GetStatus(issue), config):
952 open_issues.append(issue)
953 else:
954 closed_issues.append(issue)
955
956 return open_issues, closed_issues
957
958 # TODO(crbug.com/monorail/7822): Delete this method when V0 API retired.
959 def GetCurrentLocationOfMovedIssue(self, cnxn, project_id, local_id):
960 """Return the current location of a moved issue based on old location."""
961 issue_id = int(self.issueformerlocations_tbl.SelectValue(
962 cnxn, 'issue_id', default=0, project_id=project_id, local_id=local_id))
963 if not issue_id:
964 return None, None
965 project_id, local_id = self.issue_tbl.SelectRow(
966 cnxn, cols=['project_id', 'local_id'], id=issue_id)
967 return project_id, local_id
968
969 def GetPreviousLocations(self, cnxn, issue):
970 """Get all the previous locations of an issue."""
971 location_rows = self.issueformerlocations_tbl.Select(
972 cnxn, cols=['project_id', 'local_id'], issue_id=issue.issue_id)
973 locations = [(pid, local_id) for (pid, local_id) in location_rows
974 if pid != issue.project_id or local_id != issue.local_id]
975 return locations
976
977 def GetCommentsByUser(self, cnxn, user_id):
978 """Get all comments created by a user"""
979 comments = self.GetComments(cnxn, commenter_id=user_id,
980 is_description=False, limit=10000)
981 return comments
982
983 def GetIssueActivity(self, cnxn, num=50, before=None, after=None,
984 project_ids=None, user_ids=None, ascending=False):
985
986 if project_ids:
987 use_clause = (
988 'USE INDEX (project_id) USE INDEX FOR ORDER BY (project_id)')
989 elif user_ids:
990 use_clause = (
991 'USE INDEX (commenter_id) USE INDEX FOR ORDER BY (commenter_id)')
992 else:
993 use_clause = ''
994
995 # TODO(jrobbins): make this into a persist method.
996 # TODO(jrobbins): this really needs permission checking in SQL, which
997 # will be slow.
998 where_conds = [('Issue.id = Comment.issue_id', [])]
999 if project_ids is not None:
1000 cond_str = 'Comment.project_id IN (%s)' % sql.PlaceHolders(project_ids)
1001 where_conds.append((cond_str, project_ids))
1002 if user_ids is not None:
1003 cond_str = 'Comment.commenter_id IN (%s)' % sql.PlaceHolders(user_ids)
1004 where_conds.append((cond_str, user_ids))
1005
1006 if before:
1007 where_conds.append(('created < %s', [before]))
1008 if after:
1009 where_conds.append(('created > %s', [after]))
1010 if ascending:
1011 order_by = [('created', [])]
1012 else:
1013 order_by = [('created DESC', [])]
1014
1015 comments = self.GetComments(
1016 cnxn, joins=[('Issue', [])], deleted_by=None, where=where_conds,
1017 use_clause=use_clause, order_by=order_by, limit=num + 1)
1018 return comments
1019
1020 def GetIssueIDsReportedByUser(self, cnxn, user_id):
1021 """Get all issue IDs created by a user"""
1022 rows = self.issue_tbl.Select(cnxn, cols=['id'], reporter_id=user_id,
1023 limit=10000)
1024 return [row[0] for row in rows]
1025
1026 def InsertIssue(self, cnxn, issue):
1027 """Store the given issue in SQL.
1028
1029 Args:
1030 cnxn: connection to SQL database.
1031 issue: Issue PB to insert into the database.
1032
1033 Returns:
1034 The int issue_id of the newly created issue.
1035 """
1036 status_id = self._config_service.LookupStatusID(
1037 cnxn, issue.project_id, issue.status)
1038 row = (issue.project_id, issue.local_id, status_id,
1039 issue.owner_id or None,
1040 issue.reporter_id,
1041 issue.opened_timestamp,
1042 issue.closed_timestamp,
1043 issue.modified_timestamp,
1044 issue.owner_modified_timestamp,
1045 issue.status_modified_timestamp,
1046 issue.component_modified_timestamp,
1047 issue.derived_owner_id or None,
1048 self._config_service.LookupStatusID(
1049 cnxn, issue.project_id, issue.derived_status),
1050 bool(issue.deleted),
1051 issue.star_count, issue.attachment_count,
1052 issue.is_spam)
1053 # ISSUE_COLs[1:] to skip setting the ID
1054 # Insert into the Primary DB.
1055 generated_ids = self.issue_tbl.InsertRows(
1056 cnxn, ISSUE_COLS[1:], [row], commit=False, return_generated_ids=True)
1057 issue_id = generated_ids[0]
1058 issue.issue_id = issue_id
1059 self.issue_tbl.Update(
1060 cnxn, {'shard': issue_id % settings.num_logical_shards},
1061 id=issue.issue_id, commit=False)
1062
1063 self._UpdateIssuesSummary(cnxn, [issue], commit=False)
1064 self._UpdateIssuesLabels(cnxn, [issue], commit=False)
1065 self._UpdateIssuesFields(cnxn, [issue], commit=False)
1066 self._UpdateIssuesComponents(cnxn, [issue], commit=False)
1067 self._UpdateIssuesCc(cnxn, [issue], commit=False)
1068 self._UpdateIssuesNotify(cnxn, [issue], commit=False)
1069 self._UpdateIssuesRelation(cnxn, [issue], commit=False)
1070 self._UpdateIssuesApprovals(cnxn, issue, commit=False)
1071 self.chart_service.StoreIssueSnapshots(cnxn, [issue], commit=False)
1072 cnxn.Commit()
1073 self._config_service.InvalidateMemcache([issue])
1074
1075 return issue_id
1076
1077 def UpdateIssues(
1078 self, cnxn, issues, update_cols=None, just_derived=False, commit=True,
1079 invalidate=True):
1080 """Update the given issues in SQL.
1081
1082 Args:
1083 cnxn: connection to SQL database.
1084 issues: list of issues to update, these must have been loaded with
1085 use_cache=False so that issue.assume_stale is False.
1086 update_cols: optional list of just the field names to update.
1087 just_derived: set to True when only updating derived fields.
1088 commit: set to False to skip the DB commit and do it in the caller.
1089 invalidate: set to False to leave cache invalidatation to the caller.
1090 """
1091 if not issues:
1092 return
1093
1094 for issue in issues: # slow, but mysql will not allow REPLACE rows.
1095 assert not issue.assume_stale, (
1096 'issue2514: Storing issue that might be stale: %r' % issue)
1097 delta = {
1098 'project_id': issue.project_id,
1099 'local_id': issue.local_id,
1100 'owner_id': issue.owner_id or None,
1101 'status_id': self._config_service.LookupStatusID(
1102 cnxn, issue.project_id, issue.status) or None,
1103 'opened': issue.opened_timestamp,
1104 'closed': issue.closed_timestamp,
1105 'modified': issue.modified_timestamp,
1106 'owner_modified': issue.owner_modified_timestamp,
1107 'status_modified': issue.status_modified_timestamp,
1108 'component_modified': issue.component_modified_timestamp,
1109 'derived_owner_id': issue.derived_owner_id or None,
1110 'derived_status_id': self._config_service.LookupStatusID(
1111 cnxn, issue.project_id, issue.derived_status) or None,
1112 'deleted': bool(issue.deleted),
1113 'star_count': issue.star_count,
1114 'attachment_count': issue.attachment_count,
1115 'is_spam': issue.is_spam,
1116 }
1117 if update_cols is not None:
1118 delta = {key: val for key, val in delta.items()
1119 if key in update_cols}
1120 self.issue_tbl.Update(cnxn, delta, id=issue.issue_id, commit=False)
1121
1122 if not update_cols:
1123 self._UpdateIssuesLabels(cnxn, issues, commit=False)
1124 self._UpdateIssuesCc(cnxn, issues, commit=False)
1125 self._UpdateIssuesFields(cnxn, issues, commit=False)
1126 self._UpdateIssuesComponents(cnxn, issues, commit=False)
1127 self._UpdateIssuesNotify(cnxn, issues, commit=False)
1128 if not just_derived:
1129 self._UpdateIssuesSummary(cnxn, issues, commit=False)
1130 self._UpdateIssuesRelation(cnxn, issues, commit=False)
1131
1132 self.chart_service.StoreIssueSnapshots(cnxn, issues, commit=False)
1133
1134 iids_to_invalidate = [issue.issue_id for issue in issues]
1135 if just_derived and invalidate:
1136 self.issue_2lc.InvalidateAllKeys(cnxn, iids_to_invalidate)
1137 elif invalidate:
1138 self.issue_2lc.InvalidateKeys(cnxn, iids_to_invalidate)
1139 if commit:
1140 cnxn.Commit()
1141 if invalidate:
1142 self._config_service.InvalidateMemcache(issues)
1143
1144 def UpdateIssue(
1145 self, cnxn, issue, update_cols=None, just_derived=False, commit=True,
1146 invalidate=True):
1147 """Update the given issue in SQL.
1148
1149 Args:
1150 cnxn: connection to SQL database.
1151 issue: the issue to update.
1152 update_cols: optional list of just the field names to update.
1153 just_derived: set to True when only updating derived fields.
1154 commit: set to False to skip the DB commit and do it in the caller.
1155 invalidate: set to False to leave cache invalidatation to the caller.
1156 """
1157 self.UpdateIssues(
1158 cnxn, [issue], update_cols=update_cols, just_derived=just_derived,
1159 commit=commit, invalidate=invalidate)
1160
1161 def _UpdateIssuesSummary(self, cnxn, issues, commit=True):
1162 """Update the IssueSummary table rows for the given issues."""
1163 self.issuesummary_tbl.InsertRows(
1164 cnxn, ISSUESUMMARY_COLS,
1165 [(issue.issue_id, issue.summary) for issue in issues],
1166 replace=True, commit=commit)
1167
1168 def _UpdateIssuesLabels(self, cnxn, issues, commit=True):
1169 """Update the Issue2Label table rows for the given issues."""
1170 label_rows = []
1171 for issue in issues:
1172 issue_shard = issue.issue_id % settings.num_logical_shards
1173 # TODO(jrobbins): If the user adds many novel labels in one issue update,
1174 # that could be slow. Solution is to add all new labels in a batch first.
1175 label_rows.extend(
1176 (issue.issue_id,
1177 self._config_service.LookupLabelID(cnxn, issue.project_id, label),
1178 False,
1179 issue_shard)
1180 for label in issue.labels)
1181 label_rows.extend(
1182 (issue.issue_id,
1183 self._config_service.LookupLabelID(cnxn, issue.project_id, label),
1184 True,
1185 issue_shard)
1186 for label in issue.derived_labels)
1187
1188 self.issue2label_tbl.Delete(
1189 cnxn, issue_id=[issue.issue_id for issue in issues],
1190 commit=False)
1191 self.issue2label_tbl.InsertRows(
1192 cnxn, ISSUE2LABEL_COLS + ['issue_shard'],
1193 label_rows, ignore=True, commit=commit)
1194
1195 def _UpdateIssuesFields(self, cnxn, issues, commit=True):
1196 """Update the Issue2FieldValue table rows for the given issues."""
1197 fieldvalue_rows = []
1198 for issue in issues:
1199 issue_shard = issue.issue_id % settings.num_logical_shards
1200 for fv in issue.field_values:
1201 fieldvalue_rows.append(
1202 (issue.issue_id, fv.field_id, fv.int_value, fv.str_value,
1203 fv.user_id or None, fv.date_value, fv.url_value, fv.derived,
1204 fv.phase_id or None, issue_shard))
1205
1206 self.issue2fieldvalue_tbl.Delete(
1207 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1208 self.issue2fieldvalue_tbl.InsertRows(
1209 cnxn, ISSUE2FIELDVALUE_COLS + ['issue_shard'],
1210 fieldvalue_rows, commit=commit)
1211
1212 def _UpdateIssuesComponents(self, cnxn, issues, commit=True):
1213 """Update the Issue2Component table rows for the given issues."""
1214 issue2component_rows = []
1215 for issue in issues:
1216 issue_shard = issue.issue_id % settings.num_logical_shards
1217 issue2component_rows.extend(
1218 (issue.issue_id, component_id, False, issue_shard)
1219 for component_id in issue.component_ids)
1220 issue2component_rows.extend(
1221 (issue.issue_id, component_id, True, issue_shard)
1222 for component_id in issue.derived_component_ids)
1223
1224 self.issue2component_tbl.Delete(
1225 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1226 self.issue2component_tbl.InsertRows(
1227 cnxn, ISSUE2COMPONENT_COLS + ['issue_shard'],
1228 issue2component_rows, ignore=True, commit=commit)
1229
1230 def _UpdateIssuesCc(self, cnxn, issues, commit=True):
1231 """Update the Issue2Cc table rows for the given issues."""
1232 cc_rows = []
1233 for issue in issues:
1234 issue_shard = issue.issue_id % settings.num_logical_shards
1235 cc_rows.extend(
1236 (issue.issue_id, cc_id, False, issue_shard)
1237 for cc_id in issue.cc_ids)
1238 cc_rows.extend(
1239 (issue.issue_id, cc_id, True, issue_shard)
1240 for cc_id in issue.derived_cc_ids)
1241
1242 self.issue2cc_tbl.Delete(
1243 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1244 self.issue2cc_tbl.InsertRows(
1245 cnxn, ISSUE2CC_COLS + ['issue_shard'],
1246 cc_rows, ignore=True, commit=commit)
1247
1248 def _UpdateIssuesNotify(self, cnxn, issues, commit=True):
1249 """Update the Issue2Notify table rows for the given issues."""
1250 notify_rows = []
1251 for issue in issues:
1252 derived_rows = [[issue.issue_id, email]
1253 for email in issue.derived_notify_addrs]
1254 notify_rows.extend(derived_rows)
1255
1256 self.issue2notify_tbl.Delete(
1257 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1258 self.issue2notify_tbl.InsertRows(
1259 cnxn, ISSUE2NOTIFY_COLS, notify_rows, ignore=True, commit=commit)
1260
1261 def _UpdateIssuesRelation(self, cnxn, issues, commit=True):
1262 """Update the IssueRelation table rows for the given issues."""
1263 relation_rows = []
1264 blocking_rows = []
1265 dangling_relation_rows = []
1266 for issue in issues:
1267 for i, dst_issue_id in enumerate(issue.blocked_on_iids):
1268 rank = issue.blocked_on_ranks[i]
1269 relation_rows.append((issue.issue_id, dst_issue_id, 'blockedon', rank))
1270 for dst_issue_id in issue.blocking_iids:
1271 blocking_rows.append((dst_issue_id, issue.issue_id, 'blockedon'))
1272 for dst_ref in issue.dangling_blocked_on_refs:
1273 if dst_ref.ext_issue_identifier:
1274 dangling_relation_rows.append((
1275 issue.issue_id, None, None,
1276 dst_ref.ext_issue_identifier, 'blockedon'))
1277 else:
1278 dangling_relation_rows.append((
1279 issue.issue_id, dst_ref.project, dst_ref.issue_id,
1280 None, 'blockedon'))
1281 for dst_ref in issue.dangling_blocking_refs:
1282 if dst_ref.ext_issue_identifier:
1283 dangling_relation_rows.append((
1284 issue.issue_id, None, None,
1285 dst_ref.ext_issue_identifier, 'blocking'))
1286 else:
1287 dangling_relation_rows.append((
1288 issue.issue_id, dst_ref.project, dst_ref.issue_id,
1289 dst_ref.ext_issue_identifier, 'blocking'))
1290 if issue.merged_into:
1291 relation_rows.append((
1292 issue.issue_id, issue.merged_into, 'mergedinto', None))
1293 if issue.merged_into_external:
1294 dangling_relation_rows.append((
1295 issue.issue_id, None, None,
1296 issue.merged_into_external, 'mergedinto'))
1297
1298 old_blocking = self.issuerelation_tbl.Select(
1299 cnxn, cols=ISSUERELATION_COLS[:-1],
1300 dst_issue_id=[issue.issue_id for issue in issues], kind='blockedon')
1301 relation_rows.extend([
1302 (row + (0,)) for row in blocking_rows if row not in old_blocking])
1303 delete_rows = [row for row in old_blocking if row not in blocking_rows]
1304
1305 for issue_id, dst_issue_id, kind in delete_rows:
1306 self.issuerelation_tbl.Delete(cnxn, issue_id=issue_id,
1307 dst_issue_id=dst_issue_id, kind=kind, commit=False)
1308 self.issuerelation_tbl.Delete(
1309 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1310 self.issuerelation_tbl.InsertRows(
1311 cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
1312 self.danglingrelation_tbl.Delete(
1313 cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
1314 self.danglingrelation_tbl.InsertRows(
1315 cnxn, DANGLINGRELATION_COLS, dangling_relation_rows, ignore=True,
1316 commit=commit)
1317
1318 def _UpdateIssuesModified(
1319 self, cnxn, iids, modified_timestamp=None, invalidate=True):
1320 """Store a modified timestamp for each of the specified issues."""
1321 if not iids:
1322 return
1323 delta = {'modified': modified_timestamp or int(time.time())}
1324 self.issue_tbl.Update(cnxn, delta, id=iids, commit=False)
1325 if invalidate:
1326 self.InvalidateIIDs(cnxn, iids)
1327
1328 def _UpdateIssuesApprovals(self, cnxn, issue, commit=True):
1329 """Update the Issue2ApprovalValue table rows for the given issue."""
1330 self.issue2approvalvalue_tbl.Delete(
1331 cnxn, issue_id=issue.issue_id, commit=commit)
1332 av_rows = [(av.approval_id, issue.issue_id, av.phase_id,
1333 av.status.name.lower(), av.setter_id, av.set_on) for
1334 av in issue.approval_values]
1335 self.issue2approvalvalue_tbl.InsertRows(
1336 cnxn, ISSUE2APPROVALVALUE_COLS, av_rows, commit=commit)
1337
1338 approver_rows = []
1339 for av in issue.approval_values:
1340 approver_rows.extend([(av.approval_id, approver_id, issue.issue_id)
1341 for approver_id in av.approver_ids])
1342 self.issueapproval2approver_tbl.Delete(
1343 cnxn, issue_id=issue.issue_id, commit=commit)
1344 self.issueapproval2approver_tbl.InsertRows(
1345 cnxn, ISSUEAPPROVAL2APPROVER_COLS, approver_rows, commit=commit)
1346
1347 def UpdateIssueStructure(self, cnxn, config, issue, template, reporter_id,
1348 comment_content, commit=True, invalidate=True):
1349 """Converts the phases and approvals structure of the issue into the
1350 structure of the given template."""
1351 # TODO(jojwang): Remove Field defs that belong to any removed approvals.
1352 approval_defs_by_id = {ad.approval_id: ad for ad in config.approval_defs}
1353 issue_avs_by_id = {av.approval_id: av for av in issue.approval_values}
1354
1355 new_approval_surveys = []
1356 new_issue_approvals = []
1357
1358 for template_av in template.approval_values:
1359 existing_issue_av = issue_avs_by_id.get(template_av.approval_id)
1360
1361 # Update all approval surveys so latest ApprovalDef survey changes
1362 # appear in the converted issue's approval values.
1363 ad = approval_defs_by_id.get(template_av.approval_id)
1364 new_av_approver_ids = []
1365 if ad:
1366 new_av_approver_ids = ad.approver_ids
1367 new_approval_surveys.append(
1368 self._MakeIssueComment(
1369 issue.project_id, reporter_id, ad.survey,
1370 is_description=True, approval_id=ad.approval_id))
1371 else:
1372 logging.info('ApprovalDef not found for approval %r', template_av)
1373
1374 # Keep approval values as-is if it exists in issue and template
1375 if existing_issue_av:
1376 new_av = tracker_bizobj.MakeApprovalValue(
1377 existing_issue_av.approval_id,
1378 approver_ids=existing_issue_av.approver_ids,
1379 status=existing_issue_av.status,
1380 setter_id=existing_issue_av.setter_id,
1381 set_on=existing_issue_av.set_on,
1382 phase_id=template_av.phase_id)
1383 new_issue_approvals.append(new_av)
1384 else:
1385 new_av = tracker_bizobj.MakeApprovalValue(
1386 template_av.approval_id, approver_ids=new_av_approver_ids,
1387 status=template_av.status, phase_id=template_av.phase_id)
1388 new_issue_approvals.append(new_av)
1389
1390 template_phase_by_name = {
1391 phase.name.lower(): phase for phase in template.phases}
1392 issue_phase_by_id = {phase.phase_id: phase for phase in issue.phases}
1393 updated_fvs = []
1394 # Trim issue FieldValues or update FieldValue phase_ids
1395 for fv in issue.field_values:
1396 # If a fv's phase has the same name as a template's phase, update
1397 # the fv's phase_id to that of the template phase's. Otherwise,
1398 # remove the fv.
1399 if fv.phase_id:
1400 issue_phase = issue_phase_by_id.get(fv.phase_id)
1401 if issue_phase and issue_phase.name:
1402 template_phase = template_phase_by_name.get(issue_phase.name.lower())
1403 # TODO(jojwang): monorail:4693, remove this after all 'stable-full'
1404 # gates have been renamed to 'stable'.
1405 if not template_phase:
1406 template_phase = template_phase_by_name.get(
1407 FLT_EQUIVALENT_GATES.get(issue_phase.name.lower()))
1408 if template_phase:
1409 fv.phase_id = template_phase.phase_id
1410 updated_fvs.append(fv)
1411 # keep all fvs that do not belong to phases.
1412 else:
1413 updated_fvs.append(fv)
1414
1415 fd_names_by_id = {fd.field_id: fd.field_name for fd in config.field_defs}
1416 amendment = tracker_bizobj.MakeApprovalStructureAmendment(
1417 [fd_names_by_id.get(av.approval_id) for av in new_issue_approvals],
1418 [fd_names_by_id.get(av.approval_id) for av in issue.approval_values])
1419
1420 # Update issue structure in RAM.
1421 issue.approval_values = new_issue_approvals
1422 issue.phases = template.phases
1423 issue.field_values = updated_fvs
1424
1425 # Update issue structure in DB.
1426 for survey in new_approval_surveys:
1427 survey.issue_id = issue.issue_id
1428 self.InsertComment(cnxn, survey, commit=False)
1429 self._UpdateIssuesApprovals(cnxn, issue, commit=False)
1430 self._UpdateIssuesFields(cnxn, [issue], commit=False)
1431 comment_pb = self.CreateIssueComment(
1432 cnxn, issue, reporter_id, comment_content,
1433 amendments=[amendment], commit=False)
1434
1435 if commit:
1436 cnxn.Commit()
1437
1438 if invalidate:
1439 self.InvalidateIIDs(cnxn, [issue.issue_id])
1440
1441 return comment_pb
1442
1443 def DeltaUpdateIssue(
1444 self, cnxn, services, reporter_id, project_id,
1445 config, issue, delta, index_now=False, comment=None, attachments=None,
1446 iids_to_invalidate=None, rules=None, predicate_asts=None,
1447 is_description=False, timestamp=None, kept_attachments=None,
1448 importer_id=None, inbound_message=None):
1449 """Update the issue in the database and return a set of update tuples.
1450
1451 Args:
1452 cnxn: connection to SQL database.
1453 services: connections to persistence layer.
1454 reporter_id: user ID of the user making this change.
1455 project_id: int ID for the current project.
1456 config: ProjectIssueConfig PB for this project.
1457 issue: Issue PB of issue to update.
1458 delta: IssueDelta object of fields to update.
1459 index_now: True if the issue should be updated in the full text index.
1460 comment: This should be the content of the comment
1461 corresponding to this change.
1462 attachments: List [(filename, contents, mimetype),...] of attachments.
1463 iids_to_invalidate: optional set of issue IDs that need to be invalidated.
1464 If provided, affected issues will be accumulated here and, the caller
1465 must call InvalidateIIDs() afterwards.
1466 rules: optional list of preloaded FilterRule PBs for this project.
1467 predicate_asts: optional list of QueryASTs for the rules. If rules are
1468 provided, then predicate_asts should also be provided.
1469 is_description: True if the comment is a new description for the issue.
1470 timestamp: int timestamp set during testing, otherwise defaults to
1471 int(time.time()).
1472 kept_attachments: This should be a list of int attachment ids for
1473 attachments kept from previous descriptions, if the comment is
1474 a change to the issue description
1475 importer_id: optional ID of user ID for an API client that is importing
1476 issues and attributing them to other users.
1477 inbound_message: optional string full text of an email that caused
1478 this comment to be added.
1479
1480 Returns:
1481 A tuple (amendments, comment_pb) with a list of Amendment PBs that
1482 describe the set of metadata updates that the user made, and the
1483 resulting IssueComment (or None if no comment was created).
1484 """
1485 timestamp = timestamp or int(time.time())
1486 old_effective_owner = tracker_bizobj.GetOwnerId(issue)
1487 old_effective_status = tracker_bizobj.GetStatus(issue)
1488 old_components = set(issue.component_ids)
1489
1490 logging.info(
1491 'Bulk edit to project_id %s issue.local_id %s, comment %r',
1492 project_id, issue.local_id, comment)
1493 if iids_to_invalidate is None:
1494 iids_to_invalidate = set([issue.issue_id])
1495 invalidate = True
1496 else:
1497 iids_to_invalidate.add(issue.issue_id)
1498 invalidate = False # Caller will do it.
1499
1500 # Store each updated value in the issue PB, and compute Update PBs
1501 amendments, impacted_iids = tracker_bizobj.ApplyIssueDelta(
1502 cnxn, self, issue, delta, config)
1503 iids_to_invalidate.update(impacted_iids)
1504
1505 # If this was a no-op with no comment, bail out and don't save,
1506 # invalidate, or re-index anything.
1507 if (not amendments and (not comment or not comment.strip()) and
1508 not attachments):
1509 logging.info('No amendments, comment, attachments: this is a no-op.')
1510 return [], None
1511
1512 # Note: no need to check for collisions when the user is doing a delta.
1513
1514 # update the modified_timestamp for any comment added, even if it was
1515 # just a text comment with no issue fields changed.
1516 issue.modified_timestamp = timestamp
1517
1518 # Update the closed timestamp before filter rules so that rules
1519 # can test for closed_timestamp, and also after filter rules
1520 # so that closed_timestamp will be set if the issue is closed by the rule.
1521 tracker_helpers.UpdateClosedTimestamp(config, issue, old_effective_status)
1522 if rules is None:
1523 logging.info('Rules were not given')
1524 rules = services.features.GetFilterRules(cnxn, config.project_id)
1525 predicate_asts = filterrules_helpers.ParsePredicateASTs(
1526 rules, config, [])
1527
1528 filterrules_helpers.ApplyGivenRules(
1529 cnxn, services, issue, config, rules, predicate_asts)
1530 tracker_helpers.UpdateClosedTimestamp(config, issue, old_effective_status)
1531 if old_effective_owner != tracker_bizobj.GetOwnerId(issue):
1532 issue.owner_modified_timestamp = timestamp
1533 if old_effective_status != tracker_bizobj.GetStatus(issue):
1534 issue.status_modified_timestamp = timestamp
1535 if old_components != set(issue.component_ids):
1536 issue.component_modified_timestamp = timestamp
1537
1538 # Store the issue in SQL.
1539 self.UpdateIssue(cnxn, issue, commit=False, invalidate=False)
1540
1541 comment_pb = self.CreateIssueComment(
1542 cnxn, issue, reporter_id, comment, amendments=amendments,
1543 is_description=is_description, attachments=attachments, commit=False,
1544 kept_attachments=kept_attachments, timestamp=timestamp,
1545 importer_id=importer_id, inbound_message=inbound_message)
1546 self._UpdateIssuesModified(
1547 cnxn, iids_to_invalidate, modified_timestamp=issue.modified_timestamp,
1548 invalidate=invalidate)
1549
1550 # Add a comment to the newly added issues saying they are now blocking
1551 # this issue.
1552 for add_issue in self.GetIssues(cnxn, delta.blocked_on_add):
1553 self.CreateIssueComment(
1554 cnxn, add_issue, reporter_id, content='',
1555 amendments=[tracker_bizobj.MakeBlockingAmendment(
1556 [(issue.project_name, issue.local_id)], [],
1557 default_project_name=add_issue.project_name)],
1558 timestamp=timestamp, importer_id=importer_id)
1559 # Add a comment to the newly removed issues saying they are no longer
1560 # blocking this issue.
1561 for remove_issue in self.GetIssues(cnxn, delta.blocked_on_remove):
1562 self.CreateIssueComment(
1563 cnxn, remove_issue, reporter_id, content='',
1564 amendments=[tracker_bizobj.MakeBlockingAmendment(
1565 [], [(issue.project_name, issue.local_id)],
1566 default_project_name=remove_issue.project_name)],
1567 timestamp=timestamp, importer_id=importer_id)
1568
1569 # Add a comment to the newly added issues saying they are now blocked on
1570 # this issue.
1571 for add_issue in self.GetIssues(cnxn, delta.blocking_add):
1572 self.CreateIssueComment(
1573 cnxn, add_issue, reporter_id, content='',
1574 amendments=[tracker_bizobj.MakeBlockedOnAmendment(
1575 [(issue.project_name, issue.local_id)], [],
1576 default_project_name=add_issue.project_name)],
1577 timestamp=timestamp, importer_id=importer_id)
1578 # Add a comment to the newly removed issues saying they are no longer
1579 # blocked on this issue.
1580 for remove_issue in self.GetIssues(cnxn, delta.blocking_remove):
1581 self.CreateIssueComment(
1582 cnxn, remove_issue, reporter_id, content='',
1583 amendments=[tracker_bizobj.MakeBlockedOnAmendment(
1584 [], [(issue.project_name, issue.local_id)],
1585 default_project_name=remove_issue.project_name)],
1586 timestamp=timestamp, importer_id=importer_id)
1587
1588 if not invalidate:
1589 cnxn.Commit()
1590
1591 if index_now:
1592 tracker_fulltext.IndexIssues(
1593 cnxn, [issue], services.user_service, self, self._config_service)
1594 else:
1595 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
1596
1597 return amendments, comment_pb
1598
1599 def InvalidateIIDs(self, cnxn, iids_to_invalidate):
1600 """Invalidate the specified issues in the Invalidate table and memcache."""
1601 issues_to_invalidate = self.GetIssues(cnxn, iids_to_invalidate)
1602 self.InvalidateIssues(cnxn, issues_to_invalidate)
1603
1604 def InvalidateIssues(self, cnxn, issues):
1605 """Invalidate the specified issues in the Invalidate table and memcache."""
1606 iids = [issue.issue_id for issue in issues]
1607 self.issue_2lc.InvalidateKeys(cnxn, iids)
1608 self._config_service.InvalidateMemcache(issues)
1609
1610 def RelateIssues(self, cnxn, issue_relation_dict, commit=True):
1611 """Update the IssueRelation table rows for the given relationships.
1612
1613 issue_relation_dict is a mapping of 'source' issues to 'destination' issues,
1614 paired with the kind of relationship connecting the two.
1615 """
1616 relation_rows = []
1617 for src_iid, dests in issue_relation_dict.items():
1618 for dst_iid, kind in dests:
1619 if kind == 'blocking':
1620 relation_rows.append((dst_iid, src_iid, 'blockedon', 0))
1621 elif kind == 'blockedon':
1622 relation_rows.append((src_iid, dst_iid, 'blockedon', 0))
1623 elif kind == 'mergedinto':
1624 relation_rows.append((src_iid, dst_iid, 'mergedinto', None))
1625
1626 self.issuerelation_tbl.InsertRows(
1627 cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
1628
1629 def CopyIssues(self, cnxn, dest_project, issues, user_service, copier_id):
1630 """Copy the given issues into the destination project."""
1631 created_issues = []
1632 iids_to_invalidate = set()
1633
1634 for target_issue in issues:
1635 assert not target_issue.assume_stale, (
1636 'issue2514: Copying issue that might be stale: %r' % target_issue)
1637 new_issue = tracker_pb2.Issue()
1638 new_issue.project_id = dest_project.project_id
1639 new_issue.project_name = dest_project.project_name
1640 new_issue.summary = target_issue.summary
1641 new_issue.labels.extend(target_issue.labels)
1642 new_issue.field_values.extend(target_issue.field_values)
1643 new_issue.reporter_id = copier_id
1644
1645 timestamp = int(time.time())
1646 new_issue.opened_timestamp = timestamp
1647 new_issue.modified_timestamp = timestamp
1648
1649 target_comments = self.GetCommentsForIssue(cnxn, target_issue.issue_id)
1650 initial_summary_comment = target_comments[0]
1651
1652 # Note that blocking and merge_into are not copied.
1653 if target_issue.blocked_on_iids:
1654 blocked_on = target_issue.blocked_on_iids
1655 iids_to_invalidate.update(blocked_on)
1656 new_issue.blocked_on_iids = blocked_on
1657
1658 # Gather list of attachments from the target issue's summary comment.
1659 # MakeIssueComments expects a list of [(filename, contents, mimetype),...]
1660 attachments = []
1661 for attachment in initial_summary_comment.attachments:
1662 object_path = ('/' + app_identity.get_default_gcs_bucket_name() +
1663 attachment.gcs_object_id)
1664 with cloudstorage.open(object_path, 'r') as f:
1665 content = f.read()
1666 attachments.append(
1667 [attachment.filename, content, attachment.mimetype])
1668
1669 if attachments:
1670 new_issue.attachment_count = len(attachments)
1671
1672 # Create the same summary comment as the target issue.
1673 comment = self._MakeIssueComment(
1674 dest_project.project_id, copier_id, initial_summary_comment.content,
1675 attachments=attachments, timestamp=timestamp, is_description=True)
1676
1677 new_issue.local_id = self.AllocateNextLocalID(
1678 cnxn, dest_project.project_id)
1679 issue_id = self.InsertIssue(cnxn, new_issue)
1680 comment.issue_id = issue_id
1681 self.InsertComment(cnxn, comment)
1682
1683 if permissions.HasRestrictions(new_issue, 'view'):
1684 self._config_service.InvalidateMemcache(
1685 [new_issue], key_prefix='nonviewable:')
1686
1687 tracker_fulltext.IndexIssues(
1688 cnxn, [new_issue], user_service, self, self._config_service)
1689 created_issues.append(new_issue)
1690
1691 # The referenced issues are all modified when the relationship is added.
1692 self._UpdateIssuesModified(
1693 cnxn, iids_to_invalidate, modified_timestamp=timestamp)
1694
1695 return created_issues
1696
1697 def MoveIssues(self, cnxn, dest_project, issues, user_service):
1698 """Move the given issues into the destination project."""
1699 old_location_rows = [
1700 (issue.issue_id, issue.project_id, issue.local_id)
1701 for issue in issues]
1702 moved_back_iids = set()
1703
1704 former_locations_in_project = self.issueformerlocations_tbl.Select(
1705 cnxn, cols=ISSUEFORMERLOCATIONS_COLS,
1706 project_id=dest_project.project_id,
1707 issue_id=[issue.issue_id for issue in issues])
1708 former_locations = {
1709 issue_id: local_id
1710 for issue_id, project_id, local_id in former_locations_in_project}
1711
1712 # Remove the issue id from issue_id_2lc so that it does not stay
1713 # around in cache and memcache.
1714 # The Key of IssueIDTwoLevelCache is (project_id, local_id).
1715 self.issue_id_2lc.InvalidateKeys(
1716 cnxn, [(issue.project_id, issue.local_id) for issue in issues])
1717 self.InvalidateIssues(cnxn, issues)
1718
1719 for issue in issues:
1720 if issue.issue_id in former_locations:
1721 dest_id = former_locations[issue.issue_id]
1722 moved_back_iids.add(issue.issue_id)
1723 else:
1724 dest_id = self.AllocateNextLocalID(cnxn, dest_project.project_id)
1725
1726 issue.local_id = dest_id
1727 issue.project_id = dest_project.project_id
1728 issue.project_name = dest_project.project_name
1729
1730 # Rewrite each whole issue so that status and label IDs are looked up
1731 # in the context of the destination project.
1732 self.UpdateIssues(cnxn, issues)
1733
1734 # Comments also have the project_id because it is needed for an index.
1735 self.comment_tbl.Update(
1736 cnxn, {'project_id': dest_project.project_id},
1737 issue_id=[issue.issue_id for issue in issues], commit=False)
1738
1739 # Record old locations so that we can offer links if the user looks there.
1740 self.issueformerlocations_tbl.InsertRows(
1741 cnxn, ISSUEFORMERLOCATIONS_COLS, old_location_rows, ignore=True,
1742 commit=False)
1743 cnxn.Commit()
1744
1745 tracker_fulltext.IndexIssues(
1746 cnxn, issues, user_service, self, self._config_service)
1747
1748 return moved_back_iids
1749
1750 def ExpungeFormerLocations(self, cnxn, project_id):
1751 """Delete history of issues that were in this project but moved out."""
1752 self.issueformerlocations_tbl.Delete(cnxn, project_id=project_id)
1753
1754 def ExpungeIssues(self, cnxn, issue_ids):
1755 """Completely delete the specified issues from the database."""
1756 logging.info('expunging the issues %r', issue_ids)
1757 tracker_fulltext.UnindexIssues(issue_ids)
1758
1759 remaining_iids = issue_ids[:]
1760
1761 # Note: these are purposely not done in a transaction to allow
1762 # incremental progress in what might be a very large change.
1763 # We are not concerned about non-atomic deletes because all
1764 # this data will be gone eventually anyway.
1765 while remaining_iids:
1766 iids_in_chunk = remaining_iids[:CHUNK_SIZE]
1767 remaining_iids = remaining_iids[CHUNK_SIZE:]
1768 self.issuesummary_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1769 self.issue2label_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1770 self.issue2component_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1771 self.issue2cc_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1772 self.issue2notify_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1773 self.issueupdate_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1774 self.attachment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1775 self.comment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1776 self.issuerelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1777 self.issuerelation_tbl.Delete(cnxn, dst_issue_id=iids_in_chunk)
1778 self.danglingrelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1779 self.issueformerlocations_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1780 self.reindexqueue_tbl.Delete(cnxn, issue_id=iids_in_chunk)
1781 self.issue_tbl.Delete(cnxn, id=iids_in_chunk)
1782
1783 def SoftDeleteIssue(self, cnxn, project_id, local_id, deleted, user_service):
1784 """Set the deleted boolean on the indicated issue and store it.
1785
1786 Args:
1787 cnxn: connection to SQL database.
1788 project_id: int project ID for the current project.
1789 local_id: int local ID of the issue to freeze/unfreeze.
1790 deleted: boolean, True to soft-delete, False to undelete.
1791 user_service: persistence layer for users, used to lookup user IDs.
1792 """
1793 issue = self.GetIssueByLocalID(cnxn, project_id, local_id, use_cache=False)
1794 issue.deleted = deleted
1795 self.UpdateIssue(cnxn, issue, update_cols=['deleted'])
1796 tracker_fulltext.IndexIssues(
1797 cnxn, [issue], user_service, self, self._config_service)
1798
1799 def DeleteComponentReferences(self, cnxn, component_id):
1800 """Delete any references to the specified component."""
1801 # TODO(jrobbins): add tasks to re-index any affected issues.
1802 # Note: if this call fails, some data could be left
1803 # behind, but it would not be displayed, and it could always be
1804 # GC'd from the DB later.
1805 self.issue2component_tbl.Delete(cnxn, component_id=component_id)
1806
1807 ### Local ID generation
1808
1809 def InitializeLocalID(self, cnxn, project_id):
1810 """Initialize the local ID counter for the specified project to zero.
1811
1812 Args:
1813 cnxn: connection to SQL database.
1814 project_id: int ID of the project.
1815 """
1816 self.localidcounter_tbl.InsertRow(
1817 cnxn, project_id=project_id, used_local_id=0, used_spam_id=0)
1818
1819 def SetUsedLocalID(self, cnxn, project_id):
1820 """Set the local ID counter based on existing issues.
1821
1822 Args:
1823 cnxn: connection to SQL database.
1824 project_id: int ID of the project.
1825 """
1826 highest_id = self.GetHighestLocalID(cnxn, project_id)
1827 self.localidcounter_tbl.InsertRow(
1828 cnxn, replace=True, used_local_id=highest_id, project_id=project_id)
1829 return highest_id
1830
1831 def AllocateNextLocalID(self, cnxn, project_id):
1832 """Return the next available issue ID in the specified project.
1833
1834 Args:
1835 cnxn: connection to SQL database.
1836 project_id: int ID of the project.
1837
1838 Returns:
1839 The next local ID.
1840 """
1841 try:
1842 next_local_id = self.localidcounter_tbl.IncrementCounterValue(
1843 cnxn, 'used_local_id', project_id=project_id)
1844 except AssertionError as e:
1845 logging.info('exception incrementing local_id counter: %s', e)
1846 next_local_id = self.SetUsedLocalID(cnxn, project_id) + 1
1847 return next_local_id
1848
1849 def GetHighestLocalID(self, cnxn, project_id):
1850 """Return the highest used issue ID in the specified project.
1851
1852 Args:
1853 cnxn: connection to SQL database.
1854 project_id: int ID of the project.
1855
1856 Returns:
1857 The highest local ID for an active or moved issues.
1858 """
1859 highest = self.issue_tbl.SelectValue(
1860 cnxn, 'MAX(local_id)', project_id=project_id)
1861 highest = highest or 0 # It will be None if the project has no issues.
1862 highest_former = self.issueformerlocations_tbl.SelectValue(
1863 cnxn, 'MAX(local_id)', project_id=project_id)
1864 highest_former = highest_former or 0
1865 return max(highest, highest_former)
1866
1867 def GetAllLocalIDsInProject(self, cnxn, project_id, min_local_id=None):
1868 """Return the list of local IDs only, not the actual issues.
1869
1870 Args:
1871 cnxn: connection to SQL database.
1872 project_id: the ID of the project to which the issue belongs.
1873 min_local_id: point to start at.
1874
1875 Returns:
1876 A range object of local IDs from 1 to N, or from min_local_id to N. It
1877 may be the case that some of those local IDs are no longer used, e.g.,
1878 if some issues were moved out of this project.
1879 """
1880 if not min_local_id:
1881 min_local_id = 1
1882 highest_local_id = self.GetHighestLocalID(cnxn, project_id)
1883 return list(range(min_local_id, highest_local_id + 1))
1884
1885 def ExpungeLocalIDCounters(self, cnxn, project_id):
1886 """Delete history of local ids that were in this project."""
1887 self.localidcounter_tbl.Delete(cnxn, project_id=project_id)
1888
1889 ### Comments
1890
1891 def _UnpackComment(
1892 self, comment_row, content_dict, inbound_message_dict, approval_dict,
1893 importer_dict):
1894 """Partially construct a Comment PB from a DB row."""
1895 (comment_id, issue_id, created, project_id, commenter_id,
1896 deleted_by, is_spam, is_description, commentcontent_id) = comment_row
1897 comment = tracker_pb2.IssueComment()
1898 comment.id = comment_id
1899 comment.issue_id = issue_id
1900 comment.timestamp = created
1901 comment.project_id = project_id
1902 comment.user_id = commenter_id
1903 comment.content = content_dict.get(commentcontent_id, '')
1904 comment.inbound_message = inbound_message_dict.get(commentcontent_id, '')
1905 comment.deleted_by = deleted_by or 0
1906 comment.is_spam = bool(is_spam)
1907 comment.is_description = bool(is_description)
1908 comment.approval_id = approval_dict.get(comment_id)
1909 comment.importer_id = importer_dict.get(comment_id)
1910 return comment
1911
1912 def _UnpackAmendment(self, amendment_row):
1913 """Construct an Amendment PB from a DB row."""
1914 (_id, _issue_id, comment_id, field_name,
1915 old_value, new_value, added_user_id, removed_user_id,
1916 custom_field_name) = amendment_row
1917 amendment = tracker_pb2.Amendment()
1918 field_enum = tracker_pb2.FieldID(field_name.upper())
1919 amendment.field = field_enum
1920
1921 # TODO(jrobbins): display old values in more cases.
1922 if new_value is not None:
1923 amendment.newvalue = new_value
1924 if old_value is not None:
1925 amendment.oldvalue = old_value
1926 if added_user_id:
1927 amendment.added_user_ids.append(added_user_id)
1928 if removed_user_id:
1929 amendment.removed_user_ids.append(removed_user_id)
1930 if custom_field_name:
1931 amendment.custom_field_name = custom_field_name
1932 return amendment, comment_id
1933
1934 def _ConsolidateAmendments(self, amendments):
1935 """Consoliodate amendments of the same field in one comment into one
1936 amendment PB."""
1937
1938 fields_dict = {}
1939 result = []
1940
1941 for amendment in amendments:
1942 key = amendment.field, amendment.custom_field_name
1943 fields_dict.setdefault(key, []).append(amendment)
1944 for (field, _custom_name), sorted_amendments in sorted(fields_dict.items()):
1945 new_amendment = tracker_pb2.Amendment()
1946 new_amendment.field = field
1947 for amendment in sorted_amendments:
1948 if amendment.newvalue is not None:
1949 if new_amendment.newvalue is not None:
1950 # NOTE: see crbug/monorail/8272. BLOCKEDON and BLOCKING changes
1951 # are all stored in newvalue e.g. (newvalue = -b/123 b/124) and
1952 # external bugs and monorail bugs are stored in separate amendments.
1953 # Without this, the values of external bug amendments and monorail
1954 # blocker bug amendments may overwrite each other.
1955 new_amendment.newvalue += (' ' + amendment.newvalue)
1956 else:
1957 new_amendment.newvalue = amendment.newvalue
1958 if amendment.oldvalue is not None:
1959 new_amendment.oldvalue = amendment.oldvalue
1960 if amendment.added_user_ids:
1961 new_amendment.added_user_ids.extend(amendment.added_user_ids)
1962 if amendment.removed_user_ids:
1963 new_amendment.removed_user_ids.extend(amendment.removed_user_ids)
1964 if amendment.custom_field_name:
1965 new_amendment.custom_field_name = amendment.custom_field_name
1966 result.append(new_amendment)
1967 return result
1968
1969 def _UnpackAttachment(self, attachment_row):
1970 """Construct an Attachment PB from a DB row."""
1971 (attachment_id, _issue_id, comment_id, filename, filesize, mimetype,
1972 deleted, gcs_object_id) = attachment_row
1973 attach = tracker_pb2.Attachment()
1974 attach.attachment_id = attachment_id
1975 attach.filename = filename
1976 attach.filesize = filesize
1977 attach.mimetype = mimetype
1978 attach.deleted = bool(deleted)
1979 attach.gcs_object_id = gcs_object_id
1980 return attach, comment_id
1981
1982 def _DeserializeComments(
1983 self, comment_rows, commentcontent_rows, amendment_rows, attachment_rows,
1984 approval_rows, importer_rows):
1985 """Turn rows into IssueComment PBs."""
1986 results = [] # keep objects in the same order as the rows
1987 results_dict = {} # for fast access when joining.
1988
1989 content_dict = dict(
1990 (commentcontent_id, content) for
1991 commentcontent_id, content, _ in commentcontent_rows)
1992 inbound_message_dict = dict(
1993 (commentcontent_id, inbound_message) for
1994 commentcontent_id, _, inbound_message in commentcontent_rows)
1995 approval_dict = dict(
1996 (comment_id, approval_id) for approval_id, comment_id in
1997 approval_rows)
1998 importer_dict = dict(importer_rows)
1999
2000 for comment_row in comment_rows:
2001 comment = self._UnpackComment(
2002 comment_row, content_dict, inbound_message_dict, approval_dict,
2003 importer_dict)
2004 results.append(comment)
2005 results_dict[comment.id] = comment
2006
2007 for amendment_row in amendment_rows:
2008 amendment, comment_id = self._UnpackAmendment(amendment_row)
2009 try:
2010 results_dict[comment_id].amendments.extend([amendment])
2011 except KeyError:
2012 logging.error('Found amendment for missing comment: %r', comment_id)
2013
2014 for attachment_row in attachment_rows:
2015 attach, comment_id = self._UnpackAttachment(attachment_row)
2016 try:
2017 results_dict[comment_id].attachments.append(attach)
2018 except KeyError:
2019 logging.error('Found attachment for missing comment: %r', comment_id)
2020
2021 for c in results:
2022 c.amendments = self._ConsolidateAmendments(c.amendments)
2023
2024 return results
2025
2026 # TODO(jrobbins): make this a private method and expose just the interface
2027 # needed by activities.py.
2028 def GetComments(
2029 self, cnxn, where=None, order_by=None, content_only=False, **kwargs):
2030 """Retrieve comments from SQL."""
2031 shard_id = sql.RandomShardID()
2032 order_by = order_by or [('created', [])]
2033 comment_rows = self.comment_tbl.Select(
2034 cnxn, cols=COMMENT_COLS, where=where,
2035 order_by=order_by, shard_id=shard_id, **kwargs)
2036 cids = [row[0] for row in comment_rows]
2037 commentcontent_ids = [row[-1] for row in comment_rows]
2038 content_rows = self.commentcontent_tbl.Select(
2039 cnxn, cols=COMMENTCONTENT_COLS, id=commentcontent_ids,
2040 shard_id=shard_id)
2041 approval_rows = self.issueapproval2comment_tbl.Select(
2042 cnxn, cols=ISSUEAPPROVAL2COMMENT_COLS, comment_id=cids)
2043 amendment_rows = []
2044 attachment_rows = []
2045 importer_rows = []
2046 if not content_only:
2047 amendment_rows = self.issueupdate_tbl.Select(
2048 cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids, shard_id=shard_id)
2049 attachment_rows = self.attachment_tbl.Select(
2050 cnxn, cols=ATTACHMENT_COLS, comment_id=cids, shard_id=shard_id)
2051 importer_rows = self.commentimporter_tbl.Select(
2052 cnxn, cols=COMMENTIMPORTER_COLS, comment_id=cids, shard_id=shard_id)
2053
2054 comments = self._DeserializeComments(
2055 comment_rows, content_rows, amendment_rows, attachment_rows,
2056 approval_rows, importer_rows)
2057 return comments
2058
2059 def GetComment(self, cnxn, comment_id):
2060 """Get the requested comment, or raise an exception."""
2061 comments = self.GetComments(cnxn, id=comment_id)
2062 try:
2063 return comments[0]
2064 except IndexError:
2065 raise exceptions.NoSuchCommentException()
2066
2067 def GetCommentsForIssue(self, cnxn, issue_id):
2068 """Return all IssueComment PBs for the specified issue.
2069
2070 Args:
2071 cnxn: connection to SQL database.
2072 issue_id: int global ID of the issue.
2073
2074 Returns:
2075 A list of the IssueComment protocol buffers for the description
2076 and comments on this issue.
2077 """
2078 comments = self.GetComments(cnxn, issue_id=[issue_id])
2079 for i, comment in enumerate(comments):
2080 comment.sequence = i
2081
2082 return comments
2083
2084
2085 def GetCommentsByID(self, cnxn, comment_ids, sequences, use_cache=True,
2086 shard_id=None):
2087 """Return all IssueComment PBs by comment ids.
2088
2089 Args:
2090 cnxn: connection to SQL database.
2091 comment_ids: a list of comment ids.
2092 sequences: sequence of the comments.
2093 use_cache: optional boolean to enable the cache.
2094 shard_id: optional int shard_id to limit retrieval.
2095
2096 Returns:
2097 A list of the IssueComment protocol buffers for comment_ids.
2098 """
2099 # Try loading issue comments from a random shard to reduce load on
2100 # primary DB.
2101 if shard_id is None:
2102 shard_id = sql.RandomShardID()
2103
2104 comment_dict, _missed_comments = self.comment_2lc.GetAll(cnxn, comment_ids,
2105 use_cache=use_cache, shard_id=shard_id)
2106
2107 comments = sorted(list(comment_dict.values()), key=lambda x: x.timestamp)
2108
2109 for i in range(len(comment_ids)):
2110 comments[i].sequence = sequences[i]
2111
2112 return comments
2113
2114 # TODO(jrobbins): remove this method because it is too slow when an issue
2115 # has a huge number of comments.
2116 def GetCommentsForIssues(self, cnxn, issue_ids, content_only=False):
2117 """Return all IssueComment PBs for each issue ID in the given list.
2118
2119 Args:
2120 cnxn: connection to SQL database.
2121 issue_ids: list of integer global issue IDs.
2122 content_only: optional boolean, set true for faster loading of
2123 comment content without attachments and amendments.
2124
2125 Returns:
2126 Dict {issue_id: [IssueComment, ...]} with IssueComment protocol
2127 buffers for the description and comments on each issue.
2128 """
2129 comments = self.GetComments(
2130 cnxn, issue_id=issue_ids, content_only=content_only)
2131
2132 comments_dict = collections.defaultdict(list)
2133 for comment in comments:
2134 comment.sequence = len(comments_dict[comment.issue_id])
2135 comments_dict[comment.issue_id].append(comment)
2136
2137 return comments_dict
2138
2139 def InsertComment(self, cnxn, comment, commit=True):
2140 """Store the given issue comment in SQL.
2141
2142 Args:
2143 cnxn: connection to SQL database.
2144 comment: IssueComment PB to insert into the database.
2145 commit: set to False to avoid doing the commit for now.
2146 """
2147 commentcontent_id = self.commentcontent_tbl.InsertRow(
2148 cnxn, content=comment.content,
2149 inbound_message=comment.inbound_message, commit=False)
2150 comment_id = self.comment_tbl.InsertRow(
2151 cnxn, issue_id=comment.issue_id, created=comment.timestamp,
2152 project_id=comment.project_id,
2153 commenter_id=comment.user_id,
2154 deleted_by=comment.deleted_by or None,
2155 is_spam=comment.is_spam, is_description=comment.is_description,
2156 commentcontent_id=commentcontent_id,
2157 commit=False)
2158 comment.id = comment_id
2159 if comment.importer_id:
2160 self.commentimporter_tbl.InsertRow(
2161 cnxn, comment_id=comment_id, importer_id=comment.importer_id)
2162
2163 amendment_rows = []
2164 for amendment in comment.amendments:
2165 field_enum = str(amendment.field).lower()
2166 if (amendment.get_assigned_value('newvalue') is not None and
2167 not amendment.added_user_ids and not amendment.removed_user_ids):
2168 amendment_rows.append((
2169 comment.issue_id, comment_id, field_enum,
2170 amendment.oldvalue, amendment.newvalue,
2171 None, None, amendment.custom_field_name))
2172 for added_user_id in amendment.added_user_ids:
2173 amendment_rows.append((
2174 comment.issue_id, comment_id, field_enum, None, None,
2175 added_user_id, None, amendment.custom_field_name))
2176 for removed_user_id in amendment.removed_user_ids:
2177 amendment_rows.append((
2178 comment.issue_id, comment_id, field_enum, None, None,
2179 None, removed_user_id, amendment.custom_field_name))
2180 # ISSUEUPDATE_COLS[1:] to skip id column.
2181 self.issueupdate_tbl.InsertRows(
2182 cnxn, ISSUEUPDATE_COLS[1:], amendment_rows, commit=False)
2183
2184 attachment_rows = []
2185 for attach in comment.attachments:
2186 attachment_rows.append([
2187 comment.issue_id, comment.id, attach.filename, attach.filesize,
2188 attach.mimetype, attach.deleted, attach.gcs_object_id])
2189 self.attachment_tbl.InsertRows(
2190 cnxn, ATTACHMENT_COLS[1:], attachment_rows, commit=False)
2191
2192 if comment.approval_id:
2193 self.issueapproval2comment_tbl.InsertRows(
2194 cnxn, ISSUEAPPROVAL2COMMENT_COLS,
2195 [(comment.approval_id, comment_id)], commit=False)
2196
2197 if commit:
2198 cnxn.Commit()
2199
2200 def _UpdateComment(self, cnxn, comment, update_cols=None):
2201 """Update the given issue comment in SQL.
2202
2203 Args:
2204 cnxn: connection to SQL database.
2205 comment: IssueComment PB to update in the database.
2206 update_cols: optional list of just the field names to update.
2207 """
2208 delta = {
2209 'commenter_id': comment.user_id,
2210 'deleted_by': comment.deleted_by or None,
2211 'is_spam': comment.is_spam,
2212 }
2213 if update_cols is not None:
2214 delta = {key: val for key, val in delta.items()
2215 if key in update_cols}
2216
2217 self.comment_tbl.Update(cnxn, delta, id=comment.id)
2218 self.comment_2lc.InvalidateKeys(cnxn, [comment.id])
2219
2220 def _MakeIssueComment(
2221 self, project_id, user_id, content, inbound_message=None,
2222 amendments=None, attachments=None, kept_attachments=None, timestamp=None,
2223 is_spam=False, is_description=False, approval_id=None, importer_id=None):
2224 """Create in IssueComment protocol buffer in RAM.
2225
2226 Args:
2227 project_id: Project with the issue.
2228 user_id: the user ID of the user who entered the comment.
2229 content: string body of the comment.
2230 inbound_message: optional string full text of an email that
2231 caused this comment to be added.
2232 amendments: list of Amendment PBs describing the
2233 metadata changes that the user made along w/ comment.
2234 attachments: [(filename, contents, mimetype),...] attachments uploaded at
2235 the time the comment was made.
2236 kept_attachments: list of Attachment PBs for attachments kept from
2237 previous descriptions, if the comment is a description
2238 timestamp: time at which the comment was made, defaults to now.
2239 is_spam: True if the comment was classified as spam.
2240 is_description: True if the comment is a description for the issue.
2241 approval_id: id, if any, of the APPROVAL_TYPE FieldDef this comment
2242 belongs to.
2243 importer_id: optional User ID of script that imported the comment on
2244 behalf of a user.
2245
2246 Returns:
2247 The new IssueComment protocol buffer.
2248
2249 The content may have some markup done during input processing.
2250
2251 Any attachments are immediately stored.
2252 """
2253 comment = tracker_pb2.IssueComment()
2254 comment.project_id = project_id
2255 comment.user_id = user_id
2256 comment.content = content or ''
2257 comment.is_spam = is_spam
2258 comment.is_description = is_description
2259 if not timestamp:
2260 timestamp = int(time.time())
2261 comment.timestamp = int(timestamp)
2262 if inbound_message:
2263 comment.inbound_message = inbound_message
2264 if amendments:
2265 logging.info('amendments is %r', amendments)
2266 comment.amendments.extend(amendments)
2267 if approval_id:
2268 comment.approval_id = approval_id
2269
2270 if attachments:
2271 for filename, body, mimetype in attachments:
2272 gcs_object_id = gcs_helpers.StoreObjectInGCS(
2273 body, mimetype, project_id, filename=filename)
2274 attach = tracker_pb2.Attachment()
2275 # attachment id is determined later by the SQL DB.
2276 attach.filename = filename
2277 attach.filesize = len(body)
2278 attach.mimetype = mimetype
2279 attach.gcs_object_id = gcs_object_id
2280 comment.attachments.extend([attach])
2281 logging.info("Save attachment with object_id: %s" % gcs_object_id)
2282
2283 if kept_attachments:
2284 for kept_attach in kept_attachments:
2285 (filename, filesize, mimetype, deleted,
2286 gcs_object_id) = kept_attach[3:]
2287 new_attach = tracker_pb2.Attachment(
2288 filename=filename, filesize=filesize, mimetype=mimetype,
2289 deleted=bool(deleted), gcs_object_id=gcs_object_id)
2290 comment.attachments.append(new_attach)
2291 logging.info("Copy attachment with object_id: %s" % gcs_object_id)
2292
2293 if importer_id:
2294 comment.importer_id = importer_id
2295
2296 return comment
2297
2298 def CreateIssueComment(
2299 self, cnxn, issue, user_id, content, inbound_message=None,
2300 amendments=None, attachments=None, kept_attachments=None, timestamp=None,
2301 is_spam=False, is_description=False, approval_id=None, commit=True,
2302 importer_id=None):
2303 """Create and store a new comment on the specified issue.
2304
2305 Args:
2306 cnxn: connection to SQL database.
2307 issue: the issue on which to add the comment, must be loaded from
2308 database with use_cache=False so that assume_stale == False.
2309 user_id: the user ID of the user who entered the comment.
2310 content: string body of the comment.
2311 inbound_message: optional string full text of an email that caused
2312 this comment to be added.
2313 amendments: list of Amendment PBs describing the
2314 metadata changes that the user made along w/ comment.
2315 attachments: [(filename, contents, mimetype),...] attachments uploaded at
2316 the time the comment was made.
2317 kept_attachments: list of attachment ids for attachments kept from
2318 previous descriptions, if the comment is an update to the description
2319 timestamp: time at which the comment was made, defaults to now.
2320 is_spam: True if the comment is classified as spam.
2321 is_description: True if the comment is a description for the issue.
2322 approval_id: id, if any, of the APPROVAL_TYPE FieldDef this comment
2323 belongs to.
2324 commit: set to False to not commit to DB yet.
2325 importer_id: user ID of an API client that is importing issues.
2326
2327 Returns:
2328 The new IssueComment protocol buffer.
2329
2330 Note that we assume that the content is safe to echo out
2331 again. The content may have some markup done during input
2332 processing.
2333 """
2334 if is_description:
2335 kept_attachments = self.GetAttachmentsByID(cnxn, kept_attachments)
2336 else:
2337 kept_attachments = []
2338
2339 comment = self._MakeIssueComment(
2340 issue.project_id, user_id, content, amendments=amendments,
2341 inbound_message=inbound_message, attachments=attachments,
2342 timestamp=timestamp, is_spam=is_spam, is_description=is_description,
2343 kept_attachments=kept_attachments, approval_id=approval_id,
2344 importer_id=importer_id)
2345 comment.issue_id = issue.issue_id
2346
2347 if attachments or kept_attachments:
2348 issue.attachment_count = (
2349 issue.attachment_count + len(attachments) + len(kept_attachments))
2350 self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
2351
2352 self.comment_creations.increment()
2353 self.InsertComment(cnxn, comment, commit=commit)
2354
2355 return comment
2356
2357 def SoftDeleteComment(
2358 self, cnxn, issue, issue_comment, deleted_by_user_id,
2359 user_service, delete=True, reindex=False, is_spam=False):
2360 """Mark comment as un/deleted, which shows/hides it from average users."""
2361 # Update number of attachments
2362 attachments = 0
2363 if issue_comment.attachments:
2364 for attachment in issue_comment.attachments:
2365 if not attachment.deleted:
2366 attachments += 1
2367
2368 # Delete only if it's not in deleted state
2369 if delete:
2370 if not issue_comment.deleted_by:
2371 issue_comment.deleted_by = deleted_by_user_id
2372 issue.attachment_count = issue.attachment_count - attachments
2373
2374 # Undelete only if it's in deleted state
2375 elif issue_comment.deleted_by:
2376 issue_comment.deleted_by = 0
2377 issue.attachment_count = issue.attachment_count + attachments
2378
2379 issue_comment.is_spam = is_spam
2380 self._UpdateComment(
2381 cnxn, issue_comment, update_cols=['deleted_by', 'is_spam'])
2382 self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
2383
2384 # Reindex the issue to take the comment deletion/undeletion into account.
2385 if reindex:
2386 tracker_fulltext.IndexIssues(
2387 cnxn, [issue], user_service, self, self._config_service)
2388 else:
2389 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
2390
2391 ### Approvals
2392
2393 def GetIssueApproval(self, cnxn, issue_id, approval_id, use_cache=True):
2394 """Retrieve the specified approval for the specified issue."""
2395 issue = self.GetIssue(cnxn, issue_id, use_cache=use_cache)
2396 approval = tracker_bizobj.FindApprovalValueByID(
2397 approval_id, issue.approval_values)
2398 if approval:
2399 return issue, approval
2400 raise exceptions.NoSuchIssueApprovalException()
2401
2402 def DeltaUpdateIssueApproval(
2403 self, cnxn, modifier_id, config, issue, approval, approval_delta,
2404 comment_content=None, is_description=False, attachments=None,
2405 commit=True, kept_attachments=None):
2406 """Update the issue's approval in the database."""
2407 amendments = []
2408
2409 # Update status in RAM and DB and create status amendment.
2410 if approval_delta.status:
2411 approval.status = approval_delta.status
2412 approval.set_on = approval_delta.set_on or int(time.time())
2413 approval.setter_id = modifier_id
2414 status_amendment = tracker_bizobj.MakeApprovalStatusAmendment(
2415 approval_delta.status)
2416 amendments.append(status_amendment)
2417
2418 self._UpdateIssueApprovalStatus(
2419 cnxn, issue.issue_id, approval.approval_id, approval.status,
2420 approval.setter_id, approval.set_on)
2421
2422 # Update approver_ids in RAM and DB and create approver amendment.
2423 approvers_add = [approver for approver in approval_delta.approver_ids_add
2424 if approver not in approval.approver_ids]
2425 approvers_remove = [approver for approver in
2426 approval_delta.approver_ids_remove
2427 if approver in approval.approver_ids]
2428 if approvers_add or approvers_remove:
2429 approver_ids = [approver for approver in
2430 list(approval.approver_ids) + approvers_add
2431 if approver not in approvers_remove]
2432 approval.approver_ids = approver_ids
2433 approvers_amendment = tracker_bizobj.MakeApprovalApproversAmendment(
2434 approvers_add, approvers_remove)
2435 amendments.append(approvers_amendment)
2436
2437 self._UpdateIssueApprovalApprovers(
2438 cnxn, issue.issue_id, approval.approval_id, approver_ids)
2439
2440 fv_amendments = tracker_bizobj.ApplyFieldValueChanges(
2441 issue, config, approval_delta.subfield_vals_add,
2442 approval_delta.subfield_vals_remove, approval_delta.subfields_clear)
2443 amendments.extend(fv_amendments)
2444 if fv_amendments:
2445 self._UpdateIssuesFields(cnxn, [issue], commit=False)
2446
2447 label_amendment = tracker_bizobj.ApplyLabelChanges(
2448 issue, config, approval_delta.labels_add, approval_delta.labels_remove)
2449 if label_amendment:
2450 amendments.append(label_amendment)
2451 self._UpdateIssuesLabels(cnxn, [issue], commit=False)
2452
2453 comment_pb = self.CreateIssueComment(
2454 cnxn, issue, modifier_id, comment_content, amendments=amendments,
2455 approval_id=approval.approval_id, is_description=is_description,
2456 attachments=attachments, commit=False,
2457 kept_attachments=kept_attachments)
2458
2459 if commit:
2460 cnxn.Commit()
2461 self.issue_2lc.InvalidateKeys(cnxn, [issue.issue_id])
2462
2463 return comment_pb
2464
2465 def _UpdateIssueApprovalStatus(
2466 self, cnxn, issue_id, approval_id, status, setter_id, set_on):
2467 """Update the approvalvalue for the given issue_id's issue."""
2468 set_on = set_on or int(time.time())
2469 delta = {
2470 'status': status.name.lower(),
2471 'setter_id': setter_id,
2472 'set_on': set_on,
2473 }
2474 self.issue2approvalvalue_tbl.Update(
2475 cnxn, delta, approval_id=approval_id, issue_id=issue_id,
2476 commit=False)
2477
2478 def _UpdateIssueApprovalApprovers(
2479 self, cnxn, issue_id, approval_id, approver_ids):
2480 """Update the list of approvers allowed to approve an issue's approval."""
2481 self.issueapproval2approver_tbl.Delete(
2482 cnxn, issue_id=issue_id, approval_id=approval_id, commit=False)
2483 self.issueapproval2approver_tbl.InsertRows(
2484 cnxn, ISSUEAPPROVAL2APPROVER_COLS, [(approval_id, approver_id, issue_id)
2485 for approver_id in approver_ids],
2486 commit=False)
2487
2488 ### Attachments
2489
2490 def GetAttachmentAndContext(self, cnxn, attachment_id):
2491 """Load a IssueAttachment from database, and its comment ID and IID.
2492
2493 Args:
2494 cnxn: connection to SQL database.
2495 attachment_id: long integer unique ID of desired issue attachment.
2496
2497 Returns:
2498 An Attachment protocol buffer that contains metadata about the attached
2499 file, or None if it doesn't exist. Also, the comment ID and issue IID
2500 of the comment and issue that contain this attachment.
2501
2502 Raises:
2503 NoSuchAttachmentException: the attachment was not found.
2504 """
2505 if attachment_id is None:
2506 raise exceptions.NoSuchAttachmentException()
2507
2508 attachment_row = self.attachment_tbl.SelectRow(
2509 cnxn, cols=ATTACHMENT_COLS, id=attachment_id)
2510 if attachment_row:
2511 (attach_id, issue_id, comment_id, filename, filesize, mimetype,
2512 deleted, gcs_object_id) = attachment_row
2513 if not deleted:
2514 attachment = tracker_pb2.Attachment(
2515 attachment_id=attach_id, filename=filename, filesize=filesize,
2516 mimetype=mimetype, deleted=bool(deleted),
2517 gcs_object_id=gcs_object_id)
2518 return attachment, comment_id, issue_id
2519
2520 raise exceptions.NoSuchAttachmentException()
2521
2522 def GetAttachmentsByID(self, cnxn, attachment_ids):
2523 """Return all Attachment PBs by attachment ids.
2524
2525 Args:
2526 cnxn: connection to SQL database.
2527 attachment_ids: a list of comment ids.
2528
2529 Returns:
2530 A list of the Attachment protocol buffers for the attachments with
2531 these ids.
2532 """
2533 attachment_rows = self.attachment_tbl.Select(
2534 cnxn, cols=ATTACHMENT_COLS, id=attachment_ids)
2535
2536 return attachment_rows
2537
2538 def _UpdateAttachment(self, cnxn, comment, attach, update_cols=None):
2539 """Update attachment metadata in the DB.
2540
2541 Args:
2542 cnxn: connection to SQL database.
2543 comment: IssueComment PB to invalidate in the cache.
2544 attach: IssueAttachment PB to update in the DB.
2545 update_cols: optional list of just the field names to update.
2546 """
2547 delta = {
2548 'filename': attach.filename,
2549 'filesize': attach.filesize,
2550 'mimetype': attach.mimetype,
2551 'deleted': bool(attach.deleted),
2552 }
2553 if update_cols is not None:
2554 delta = {key: val for key, val in delta.items()
2555 if key in update_cols}
2556
2557 self.attachment_tbl.Update(cnxn, delta, id=attach.attachment_id)
2558 self.comment_2lc.InvalidateKeys(cnxn, [comment.id])
2559
2560 def SoftDeleteAttachment(
2561 self, cnxn, issue, issue_comment, attach_id, user_service, delete=True,
2562 index_now=False):
2563 """Mark attachment as un/deleted, which shows/hides it from avg users."""
2564 attachment = None
2565 for attach in issue_comment.attachments:
2566 if attach.attachment_id == attach_id:
2567 attachment = attach
2568
2569 if not attachment:
2570 logging.warning(
2571 'Tried to (un)delete non-existent attachment #%s in project '
2572 '%s issue %s', attach_id, issue.project_id, issue.local_id)
2573 return
2574
2575 if not issue_comment.deleted_by:
2576 # Decrement attachment count only if it's not in deleted state
2577 if delete:
2578 if not attachment.deleted:
2579 issue.attachment_count = issue.attachment_count - 1
2580
2581 # Increment attachment count only if it's in deleted state
2582 elif attachment.deleted:
2583 issue.attachment_count = issue.attachment_count + 1
2584
2585 logging.info('attachment.deleted was %s', attachment.deleted)
2586
2587 attachment.deleted = delete
2588
2589 logging.info('attachment.deleted is %s', attachment.deleted)
2590
2591 self._UpdateAttachment(
2592 cnxn, issue_comment, attachment, update_cols=['deleted'])
2593 self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
2594
2595 if index_now:
2596 tracker_fulltext.IndexIssues(
2597 cnxn, [issue], user_service, self, self._config_service)
2598 else:
2599 self.EnqueueIssuesForIndexing(cnxn, [issue.issue_id])
2600
2601 ### Reindex queue
2602
2603 def EnqueueIssuesForIndexing(self, cnxn, issue_ids, commit=True):
2604 # type: (MonorailConnection, Collection[int], Optional[bool]) -> None
2605 """Add the given issue IDs to the ReindexQueue table."""
2606 reindex_rows = [(issue_id,) for issue_id in issue_ids]
2607 self.reindexqueue_tbl.InsertRows(
2608 cnxn, ['issue_id'], reindex_rows, ignore=True, commit=commit)
2609
2610 def ReindexIssues(self, cnxn, num_to_reindex, user_service):
2611 """Reindex some issues specified in the IndexQueue table."""
2612 rows = self.reindexqueue_tbl.Select(
2613 cnxn, order_by=[('created', [])], limit=num_to_reindex)
2614 issue_ids = [row[0] for row in rows]
2615
2616 if issue_ids:
2617 issues = self.GetIssues(cnxn, issue_ids)
2618 tracker_fulltext.IndexIssues(
2619 cnxn, issues, user_service, self, self._config_service)
2620 self.reindexqueue_tbl.Delete(cnxn, issue_id=issue_ids)
2621
2622 return len(issue_ids)
2623
2624 ### Search functions
2625
2626 def RunIssueQuery(
2627 self, cnxn, left_joins, where, order_by, shard_id=None, limit=None):
2628 """Run a SQL query to find matching issue IDs.
2629
2630 Args:
2631 cnxn: connection to SQL database.
2632 left_joins: list of SQL LEFT JOIN clauses.
2633 where: list of SQL WHERE clauses.
2634 order_by: list of SQL ORDER BY clauses.
2635 shard_id: int shard ID to focus the search.
2636 limit: int maximum number of results, defaults to
2637 settings.search_limit_per_shard.
2638
2639 Returns:
2640 (issue_ids, capped) where issue_ids is a list of the result issue IDs,
2641 and capped is True if the number of results reached the limit.
2642 """
2643 limit = limit or settings.search_limit_per_shard
2644 where = where + [('Issue.deleted = %s', [False])]
2645 rows = self.issue_tbl.Select(
2646 cnxn, shard_id=shard_id, distinct=True, cols=['Issue.id'],
2647 left_joins=left_joins, where=where, order_by=order_by,
2648 limit=limit)
2649 issue_ids = [row[0] for row in rows]
2650 capped = len(issue_ids) >= limit
2651 return issue_ids, capped
2652
2653 def GetIIDsByLabelIDs(self, cnxn, label_ids, project_id, shard_id):
2654 """Return a list of IIDs for issues with any of the given label IDs."""
2655 if not label_ids:
2656 return []
2657 where = []
2658 if shard_id is not None:
2659 slice_term = ('shard = %s', [shard_id])
2660 where.append(slice_term)
2661
2662 rows = self.issue_tbl.Select(
2663 cnxn, shard_id=shard_id, cols=['id'],
2664 left_joins=[('Issue2Label ON Issue.id = Issue2Label.issue_id', [])],
2665 label_id=label_ids, project_id=project_id, where=where)
2666 return [row[0] for row in rows]
2667
2668 def GetIIDsByParticipant(self, cnxn, user_ids, project_ids, shard_id):
2669 """Return IIDs for issues where any of the given users participate."""
2670 iids = []
2671 where = []
2672 if shard_id is not None:
2673 where.append(('shard = %s', [shard_id]))
2674 if project_ids:
2675 cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids)
2676 where.append((cond_str, project_ids))
2677
2678 # TODO(jrobbins): Combine these 3 queries into one with ORs. It currently
2679 # is not the bottleneck.
2680 rows = self.issue_tbl.Select(
2681 cnxn, cols=['id'], reporter_id=user_ids,
2682 where=where, shard_id=shard_id)
2683 for row in rows:
2684 iids.append(row[0])
2685
2686 rows = self.issue_tbl.Select(
2687 cnxn, cols=['id'], owner_id=user_ids,
2688 where=where, shard_id=shard_id)
2689 for row in rows:
2690 iids.append(row[0])
2691
2692 rows = self.issue_tbl.Select(
2693 cnxn, cols=['id'], derived_owner_id=user_ids,
2694 where=where, shard_id=shard_id)
2695 for row in rows:
2696 iids.append(row[0])
2697
2698 rows = self.issue_tbl.Select(
2699 cnxn, cols=['id'],
2700 left_joins=[('Issue2Cc ON Issue2Cc.issue_id = Issue.id', [])],
2701 cc_id=user_ids,
2702 where=where + [('cc_id IS NOT NULL', [])],
2703 shard_id=shard_id)
2704 for row in rows:
2705 iids.append(row[0])
2706
2707 rows = self.issue_tbl.Select(
2708 cnxn, cols=['Issue.id'],
2709 left_joins=[
2710 ('Issue2FieldValue ON Issue.id = Issue2FieldValue.issue_id', []),
2711 ('FieldDef ON Issue2FieldValue.field_id = FieldDef.id', [])],
2712 user_id=user_ids, grants_perm='View',
2713 where=where + [('user_id IS NOT NULL', [])],
2714 shard_id=shard_id)
2715 for row in rows:
2716 iids.append(row[0])
2717
2718 return iids
2719
2720 ### Issue Dependency Rankings
2721
2722 def SortBlockedOn(self, cnxn, issue, blocked_on_iids):
2723 """Sort blocked_on dependencies by rank and dst_issue_id.
2724
2725 Args:
2726 cnxn: connection to SQL database.
2727 issue: the issue being blocked.
2728 blocked_on_iids: the iids of all the issue's blockers
2729
2730 Returns:
2731 a tuple (ids, ranks), where ids is the sorted list of
2732 blocked_on_iids and ranks is the list of corresponding ranks
2733 """
2734 rows = self.issuerelation_tbl.Select(
2735 cnxn, cols=ISSUERELATION_COLS, issue_id=issue.issue_id,
2736 dst_issue_id=blocked_on_iids, kind='blockedon',
2737 order_by=[('rank DESC', []), ('dst_issue_id', [])])
2738 ids = [row[1] for row in rows]
2739 ids.extend([iid for iid in blocked_on_iids if iid not in ids])
2740 ranks = [row[3] for row in rows]
2741 ranks.extend([0] * (len(blocked_on_iids) - len(ranks)))
2742 return ids, ranks
2743
2744 def ApplyIssueRerank(
2745 self, cnxn, parent_id, relations_to_change, commit=True, invalidate=True):
2746 """Updates rankings of blocked on issue relations to new values
2747
2748 Args:
2749 cnxn: connection to SQL database.
2750 parent_id: the global ID of the blocked issue to update
2751 relations_to_change: This should be a list of
2752 [(blocker_id, new_rank),...] of relations that need to be changed
2753 commit: set to False to skip the DB commit and do it in the caller.
2754 invalidate: set to False to leave cache invalidatation to the caller.
2755 """
2756 blocker_ids = [blocker for (blocker, rank) in relations_to_change]
2757 self.issuerelation_tbl.Delete(
2758 cnxn, issue_id=parent_id, dst_issue_id=blocker_ids, commit=False)
2759 insert_rows = [(parent_id, blocker, 'blockedon', rank)
2760 for (blocker, rank) in relations_to_change]
2761 self.issuerelation_tbl.InsertRows(
2762 cnxn, cols=ISSUERELATION_COLS, row_values=insert_rows, commit=commit)
2763 if invalidate:
2764 self.InvalidateIIDs(cnxn, [parent_id])
2765
2766 # Expunge Users from Issues system.
2767 def ExpungeUsersInIssues(self, cnxn, user_ids_by_email, limit=None):
2768 """Removes all references to given users from issue DB tables.
2769
2770 This method will not commit the operations. This method will
2771 not make changes to in-memory data.
2772
2773 Args:
2774 cnxn: connection to SQL database.
2775 user_ids_by_email: dict of {email: user_id} of all users we want
2776 to expunge.
2777 limit: Optional, the limit for each operation.
2778
2779 Returns:
2780 A list of issue_ids that need to be reindexed.
2781 """
2782 commit = False
2783 user_ids = list(user_ids_by_email.values())
2784 user_emails = list(user_ids_by_email.keys())
2785 # Track issue_ids for issues that will have different search documents
2786 # as a result of removing users.
2787 affected_issue_ids = []
2788
2789 # Reassign commenter_id and delete inbound_messages.
2790 shard_id = sql.RandomShardID()
2791 comment_content_id_rows = self.comment_tbl.Select(
2792 cnxn, cols=['Comment.id', 'Comment.issue_id', 'commentcontent_id'],
2793 commenter_id=user_ids, shard_id=shard_id, limit=limit)
2794 comment_ids = [row[0] for row in comment_content_id_rows]
2795 commentcontent_ids = [row[2] for row in comment_content_id_rows]
2796 if commentcontent_ids:
2797 self.commentcontent_tbl.Update(
2798 cnxn, {'inbound_message': None}, id=commentcontent_ids, commit=commit)
2799 if comment_ids:
2800 self.comment_tbl.Update(
2801 cnxn, {'commenter_id': framework_constants.DELETED_USER_ID},
2802 id=comment_ids,
2803 commit=commit)
2804 affected_issue_ids.extend([row[1] for row in comment_content_id_rows])
2805
2806 # Reassign deleted_by comments deleted_by.
2807 self.comment_tbl.Update(
2808 cnxn,
2809 {'deleted_by': framework_constants.DELETED_USER_ID},
2810 deleted_by=user_ids,
2811 commit=commit, limit=limit)
2812
2813 # Remove users in field values.
2814 fv_issue_id_rows = self.issue2fieldvalue_tbl.Select(
2815 cnxn, cols=['issue_id'], user_id=user_ids, limit=limit)
2816 fv_issue_ids = [row[0] for row in fv_issue_id_rows]
2817 self.issue2fieldvalue_tbl.Delete(
2818 cnxn, user_id=user_ids, limit=limit, commit=commit)
2819 affected_issue_ids.extend(fv_issue_ids)
2820
2821 # Remove users in approval values.
2822 self.issueapproval2approver_tbl.Delete(
2823 cnxn, approver_id=user_ids, commit=commit, limit=limit)
2824 self.issue2approvalvalue_tbl.Update(
2825 cnxn,
2826 {'setter_id': framework_constants.DELETED_USER_ID},
2827 setter_id=user_ids,
2828 commit=commit, limit=limit)
2829
2830 # Remove users in issue Ccs.
2831 cc_issue_id_rows = self.issue2cc_tbl.Select(
2832 cnxn, cols=['issue_id'], cc_id=user_ids, limit=limit)
2833 cc_issue_ids = [row[0] for row in cc_issue_id_rows]
2834 self.issue2cc_tbl.Delete(
2835 cnxn, cc_id=user_ids, limit=limit, commit=commit)
2836 affected_issue_ids.extend(cc_issue_ids)
2837
2838 # Remove users in issue owners.
2839 owner_issue_id_rows = self.issue_tbl.Select(
2840 cnxn, cols=['id'], owner_id=user_ids, limit=limit)
2841 owner_issue_ids = [row[0] for row in owner_issue_id_rows]
2842 if owner_issue_ids:
2843 self.issue_tbl.Update(
2844 cnxn, {'owner_id': None}, id=owner_issue_ids, commit=commit)
2845 affected_issue_ids.extend(owner_issue_ids)
2846 derived_owner_issue_id_rows = self.issue_tbl.Select(
2847 cnxn, cols=['id'], derived_owner_id=user_ids, limit=limit)
2848 derived_owner_issue_ids = [row[0] for row in derived_owner_issue_id_rows]
2849 if derived_owner_issue_ids:
2850 self.issue_tbl.Update(
2851 cnxn, {'derived_owner_id': None},
2852 id=derived_owner_issue_ids,
2853 commit=commit)
2854 affected_issue_ids.extend(derived_owner_issue_ids)
2855
2856 # Remove users in issue reporters.
2857 reporter_issue_id_rows = self.issue_tbl.Select(
2858 cnxn, cols=['id'], reporter_id=user_ids, limit=limit)
2859 reporter_issue_ids = [row[0] for row in reporter_issue_id_rows]
2860 if reporter_issue_ids:
2861 self.issue_tbl.Update(
2862 cnxn, {'reporter_id': framework_constants.DELETED_USER_ID},
2863 id=reporter_issue_ids,
2864 commit=commit)
2865 affected_issue_ids.extend(reporter_issue_ids)
2866
2867 # Note: issueupdate_tbl's and issue2notify's user_id columns do not
2868 # reference the User table. So all values need to updated here before
2869 # User rows can be deleted safely. No limit will be applied.
2870
2871 # Remove users in issue updates.
2872 self.issueupdate_tbl.Update(
2873 cnxn,
2874 {'added_user_id': framework_constants.DELETED_USER_ID},
2875 added_user_id=user_ids,
2876 commit=commit)
2877 self.issueupdate_tbl.Update(
2878 cnxn,
2879 {'removed_user_id': framework_constants.DELETED_USER_ID},
2880 removed_user_id=user_ids,
2881 commit=commit)
2882
2883 # Remove users in issue notify.
2884 self.issue2notify_tbl.Delete(
2885 cnxn, email=user_emails, commit=commit)
2886
2887 # Remove users in issue snapshots.
2888 self.issuesnapshot_tbl.Update(
2889 cnxn,
2890 {'owner_id': framework_constants.DELETED_USER_ID},
2891 owner_id=user_ids,
2892 commit=commit, limit=limit)
2893 self.issuesnapshot_tbl.Update(
2894 cnxn,
2895 {'reporter_id': framework_constants.DELETED_USER_ID},
2896 reporter_id=user_ids,
2897 commit=commit, limit=limit)
2898 self.issuesnapshot2cc_tbl.Delete(
2899 cnxn, cc_id=user_ids, commit=commit, limit=limit)
2900
2901 return list(set(affected_issue_ids))